Kafka入门, 消费者组案例(十九)

pom 文件

    <dependencies><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.0.0</version></dependency></dependencies>

独立消费者案例(订阅主语)

在这里插入图片描述
在消费者API代码中,必须配置消费者id。命令行启动消费者不填写消费者组id会被自动填写随机得消费者组id

package com.longer.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class CustomConsumer {public static void main(String[] args) {//创建消费者的配置对象Properties properties=new Properties();//2、给消费者配置对象添加参数properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092");//配置序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//配置消费者组(组名任意起名)必须properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");//创建消费者对象KafkaConsumer<String,String> kafkaConsumer=new KafkaConsumer<String, String>(properties);//注册要消费的主题ArrayList<String> topics=new ArrayList<>();topics.add("first");kafkaConsumer.subscribe(topics);while (true){//设置1s中消费一批数据ConsumerRecords<String,String> consumerRecords=kafkaConsumer.poll(Duration.ofSeconds(1));//打印消费到的数据for(ConsumerRecord<String,String> record:consumerRecords){System.out.println(record);}}}
}

发送的信息
在这里插入图片描述
消费信息(因为我发了好多次)
在这里插入图片描述

独立消费者案例

在这里插入图片描述

package com.longer.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class CustomConsumerPartition {public static void main(String[] args) {Properties properties=new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//配置消费者组(组名任意起名)必须properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");KafkaConsumer kafkaConsumer=new KafkaConsumer(properties);//消费某个注意的某个分区数据ArrayList<TopicPartition> topicPartitions=new ArrayList<>();topicPartitions.add(new TopicPartition("first",0));kafkaConsumer.assign(topicPartitions);while (true){ConsumerRecords<String,String> consumerRecords=kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
}

在这里插入图片描述

在这里插入图片描述
发送信息发现只消费0分区的信息
在这里插入图片描述

消费者组案例

在这里插入图片描述

package com.longer.consumer.group;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class CustomConsumer1 {public static void main(String[] args) {//创建消费者的配置对象Properties properties=new Properties();//2、给消费者配置对象添加参数properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092");//配置序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//配置消费者组(组名任意起名)必须properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");//创建消费者对象KafkaConsumer<String,String> kafkaConsumer=new KafkaConsumer<String, String>(properties);//注册要消费的主题ArrayList<String> topics=new ArrayList<>();topics.add("first");kafkaConsumer.subscribe(topics);while (true){//设置1s中消费一批数据ConsumerRecords<String,String> consumerRecords=kafkaConsumer.poll(Duration.ofSeconds(1));//打印消费到的数据for(ConsumerRecord<String,String> record:consumerRecords){System.out.println(record);}}}
}

复制三分,然后运行
在这里插入图片描述在这里插入图片描述

在这里插入图片描述
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/14956.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

nginx四层转发应用

默认使用yum安装的nginx是没有额外安装的动态模块的&#xff0c;需要自己额外安装 ls /usr/lib64/nginx/modules/ 若是不安装stream模块&#xff0c;直接在nginx的配置文件中调用stream模块&#xff0c;重载配置文件的时候会报错识别不到stream功能 安装stream模块 yum insta…

匿名管道的使用示例

目录 整体框架 通信步骤 创建管道 ​编辑创建子进程&关闭相应的fd ​编辑 进程间通信 父子进程通信之间四种场景 实现父亲读&#xff0c;孩子写的进程间通信 管道通信的使用场景样例实现 整体框架 通信步骤 创建管道 pipe的参数为输出型参数&#xff0c;返回读写端…

如何提升问卷数据的有效性?

问卷调查法是收集数据的宝贵工具&#xff0c;可以为商业、社会科学和医疗保健等众多领域的决策过程提供真实可靠的数据信息。然而&#xff0c;问卷数据的准确性和可靠性是影响最终结论的关键因素&#xff0c;而他们取决于问卷设计和数据收集过程的质量。在本文中&#xff0c;我…

zabbix proxy的配置及zabbix实现高可用(监控 windows,java应用,SNMP等)

目录 zabbix proxy 分布式代理服务器部署zabbix proxy 代理服务器部署 Zabbix 高可用集群Zabbix 监控 Windows 系统Zabbix 监控 java 应用Zabbix 监控 SNMP zabbix proxy 分布式代理服务器 zabbix 分布式代理服务器&#xff0c;可以代替zabbix server 采集性能和可用性数据。z…

nginx七层代理和四层转发的理解

先来理解一下osi七层模型 应用层 应用层是ISO七层模型的最高层&#xff0c;它直接与用户和应用程序交互&#xff0c;提供用户与网络的接口。它包括各种应用协议&#xff0c;如HTTP、FTP、SMTP等&#xff0c;用于实现特定应用的功能和通信表示层 表示层…

学生成绩分析项目

数据采集 导入必要的库 import pandas as pd import matplotlib.pyplot as plt import seaborn as sns加载数据集 df pd.read_csv(D:\\桌面\\数据\\student_marks.csv)显示数据框的前几行 # 显示数据框的形状 print("Shape of the dataframe:", df.shape)#显示…

【运维】GitLab相关配置优化等

默认 Git 设置 http post 的缓存为 1MB&#xff0c;使用命令将git的缓存设为500M&#xff0c;重新配置一下postBuffer值 git config --global http.postBuffer 524288000 解决方法2&#xff1a;直接修改config参数&#xff0c; windows: ./git/config中&#xff0c;加入以下…

Freertos-mini智能音箱项目---IO扩展芯片PCA9557

项目上用到的ESP32S3芯片引脚太少&#xff0c;选择了PCA9557扩展IO&#xff0c;通过一路i2c可以扩展出8个IO。这款芯片没有中断输入&#xff0c;所以更适合做扩展输出引脚用&#xff0c;内部寄存器也比较少&#xff0c;只有4个&#xff0c;使用起来很容易。 输入寄存器 输出寄存…

【Linux】高级IO(二)

文章目录 高级IO&#xff08;二&#xff09;I/O多路转接之pollpoll服务器 I/O多路转接之epollepoll相关函数epoll工作原理epoll回调机制epoll服务器epoll的优点 高级IO&#xff08;二&#xff09; I/O多路转接之poll poll也是系统提供的一个多路转接接口 poll系统调用也可以…

基于go-zero的api服务刨析并对比与gin的区别

zero路由与gin的区别 官网go-zero go-zero是一个集成了各种工程实践的微服务框架&#xff0c;集多种功能于一体&#xff0c;如服务主要的API服务&#xff0c;RPC服务等。除了构建微服务工程外&#xff0c;zero也是一款性能优良的web框架&#xff0c;也可以构建单体web应用。 …

最新AI创作系统V5.0.2+支持GPT4+支持ai绘画+实时语音识别输入+文章资讯发布功能+用户会员套餐

最新AI创作系统V5.0.2支持GPT4支持ai绘画实时语音识别输入文章资讯发布功能用户会员套餐&#xff01; AI创作系统一、源码系统介绍二、AI创作系统程序下载三、安装教程四、主要功能展示五、更新日志 AI创作系统 1、提问&#xff1a;程序已经支持GPT3.5、GPT4.0接口 2、支持三种…

Maven 配置本地jar,通过下载第三方jar包,然后手动配置maven jar包依赖 例如:IKExpression

说明&#xff1a;有时候有一些jar包 maven中央仓库和阿里云仓库没有收录的jar包需要手动下载至本地进行手动添加maven依赖&#xff0c;就拿 IK表达式 IKExpression jar 包来说 第一步 下载IKExpression 包 没有这个包的同学可以点击下载阿里云盘分享 第二步 找到自己项目本地…