【kafka】Java客户端代码demo:自动异步提交、手动同步提交及提交颗粒度、动态负载均衡

一,代码及配置项介绍

kafka版本为3.6,部署在3台linux上。

maven依赖如下:

        <!-- kafka --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.13</artifactId><version>3.6.0</version></dependency>

生产者、消费者和topic代码如下:

    String topic = "items-01";@Testpublic void producer() throws ExecutionException, InterruptedException {Properties p = new Properties();p.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.184.129:9092,192.168.184.130:9092,192.168.184.131:9092");p.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());p.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());p.setProperty(ProducerConfig.ACKS_CONFIG, "-1");KafkaProducer<String, String> producer = new KafkaProducer<String, String>(p);while(true){for (int i = 0; i < 3; i++) {for (int j = 0; j <3; j++) {ProducerRecord<String, String> record = new ProducerRecord<>(topic, "item"+j,"val" + i);Future<RecordMetadata> send = producer.send(record);RecordMetadata rm = send.get();int partition = rm.partition();long offset = rm.offset();System.out.println("key: "+ record.key()+" val: "+record.value()+" partition: "+partition + " offset: "+offset);}}}}@Testpublic void consumer(){//基础配置Properties properties = new Properties();properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.184.129:9092,192.168.184.130:9092,192.168.184.131:9092");properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());//消费的细节String group = "user-center";properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,group);//KAKFA IS MQ  IS STORAGE/***         "What to do when there is no initial offset in Kafka or if the current offset*         does not exist any more on the server*         (e.g. because that data has been deleted):*         <ul>*             <li>earliest: automatically reset the offset to the earliest offset*             <li>latest: automatically reset the offset to the latest offset</li>*             <li>none: throw exception to the consumer if no previous offset is found for the consumer's group</li><li>anything else: throw exception to the consumer.</li>*         </ul>";*/properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");//第一次启动,米有offset//自动提交时异步提交,丢数据&&重复数据properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");//        p.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"15000");
//        p.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"10");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);consumer.subscribe(Arrays.asList(topic));while(true){ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(0));if (!records.isEmpty()){System.out.println();System.out.println("-----------------" + records.count() + "------------------------------");Iterator<ConsumerRecord<String,String>> iterator = records.iterator();while (iterator.hasNext()){ConsumerRecord<String,String> record = iterator.next();int partition = record.partition();long offset = record.offset();System.out.println("key: " + record.key() + " val: " + record.value() + " partition: " + partition + " offset: " + offset);}}}}

这里先简单解释一下,kafka的topic只是一个逻辑上的概念,实际上的物理存储是依赖分布在broker中的分区partition来完成的。kafka依赖的zk中有一个__consumer_offsets[1]话题,存储了所有consumer和group消费的进度,包括当前消费到的进度current-offset、kafka写入磁盘的日志中记录的消息的末尾log-end-offset

kafka根据消息的key进行哈希取模的结果来将消息分配到不同的partition,partition才是consumer拉取的对象。每次consumer拉取,都是从一个partition中拉取。(这一点,大家可以自己去验证一下)

在这里插入图片描述

下面代码,是描述的当consumer第一次启动时,在kafka中还没有消费的记录,此时current-offset为"-"时,consumer应如何拉取数据的行为。有3个值可选,latest、earliest、none。

当设置如下配置为latest,没有current-offset时,只拉取consumer启动后的新消息。
earliest,没有current-offset时,从头开始拉取消息消费。
node,没有current-offset时,抛异常。

它们的共同点就是current-offset有值时,自然都会按照current-offset拉取消息。

properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");

下面代码,true表示设置的异步自动提交,false为手动提交。

properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");

下面代码,设置的是自动提交时,要过多少秒去异步自动提交。

properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"15000");

下面代码,是设置kafka批量拉取多少数据,默认的应该是最大500,小于500。 kafka可以批量的拉取数据,这样可以节省网卡资源。

properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"10");

二、异步自动提交

部分设置项如下:

        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");//自动提交时异步提交,丢数据&&重复数据properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"15000");properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"10"); 

开启生产者,生产出一些消息,可以看到之前拉取完数据的group又有了新的数据。
在这里插入图片描述
开启消费者,可以看到消息被消费掉。
在这里插入图片描述

因为提交是异步的,我们需要需要为了业务代码留出处理时间。所以需要设置异步提交时间。

假设在间隔时间(AUTO_COMMIT_INTERVAL_MS_CONFIG,自动提交间隔毫秒数配置)内,还没有提交的时候,消费过数据假设数据,consumer挂了。那么consumer再次启动时,从kafka拉取数据,就会因为还没有提交offset,而重新拉取消费过的数据,导致重复消费。

假设现在已经过了延间隔时间,提交成功了,但是业务还没有完成,并且在提交后失败了。那么这个消费失败的消息也不会被重新消费了,导致丢失消息。

为了解决上述的问题,可以使用手动同步提交。

三、手动同步提交

假设我们现在是按照批量拉取,下面介绍2种提交粒度的demo,粒度由小到大,分别是按条提交,按partition提交 && 按批次提交。

3.1 按条提交

public void test1(){//基础配置Properties properties = new Properties();properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.184.129:9092,192.168.184.130:9092,192.168.184.131:9092");properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());//消费的细节String group = "user-center";properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,group);//KAKFA IS MQ  IS STORAGE/***         "What to do when there is no initial offset in Kafka or if the current offset*         does not exist any more on the server*         (e.g. because that data has been deleted):*         <ul>*             <li>earliest: automatically reset the offset to the earliest offset*             <li>latest: automatically reset the offset to the latest offset</li>*             <li>none: throw exception to the consumer if no previous offset is found for the consumer's group</li><li>anything else: throw exception to the consumer.</li>*         </ul>";*/properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");//第一次启动,米有offset//自动提交时异步提交,丢数据&&重复数据properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");//        p.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"15000");
//        p.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"10");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);consumer.subscribe(Arrays.asList(topic));while(true){ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(0));if (!records.isEmpty()){System.out.println();System.out.println("-----------------" + records.count() + "------------------------------");Iterator<ConsumerRecord<String,String>> iterator = records.iterator();while (iterator.hasNext()){ConsumerRecord<String,String> next = iterator.next();int p = next.partition();long offset = next.offset();String key = next.key();String value = next.value();System.out.println("key: " + key + " val: " + value + " partition: " + p + " offset: " + offset);TopicPartition sp = new TopicPartition(topic,p);OffsetAndMetadata om = new OffsetAndMetadata(offset);HashMap<TopicPartition, OffsetAndMetadata> map = new HashMap<>();map.put(sp,om);consumer.commitSync(map);}}}}

3.2 按partition提交 && 按批次提交

由于消费者每一次拉取都是从一个partition中拉取,所以其实按partition拉取和按批次拉取,是一回事。整体成功

    @Testpublic void test2(){//基础配置Properties properties = new Properties();properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.184.129:9092,192.168.184.130:9092,192.168.184.131:9092");properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());//消费的细节String group = "user-center";properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,group);properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");//自动提交时异步提交,丢数据&&重复数据properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");//        p.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"15000");
//        p.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"10");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);consumer.subscribe(Arrays.asList(topic));while(true){ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(0));if (!records.isEmpty()){System.out.println();System.out.println("-----------------" + records.count() + "------------------------------");Set<TopicPartition> partitions = records.partitions();for (TopicPartition partition : partitions){List<ConsumerRecord<String,String>> pRecords = records.records(partition);Iterator<ConsumerRecord<String,String>> pIterator = pRecords.iterator();while (pIterator.hasNext()){ConsumerRecord<String,String> next = pIterator.next();int p = next.partition();long offset = next.offset();String key = next.key();String value = next.value();System.out.println("key: " + key + " val: " + value + " partition: " + p + " offset: " + offset);}//按partition提交long offset = pRecords.get(pRecords.size() - 1).offset();OffsetAndMetadata om = new OffsetAndMetadata(offset);HashMap<TopicPartition, OffsetAndMetadata> map = new HashMap<>();map.put(partition,om);consumer.commitSync(map);}//按批次提交
//                consumer.commitSync();}}}

四,动态负载均衡

我们知道,对于一个topic,一个group中,为了保证消息的顺序性,默认只能有一个consumer来消费。假设我们有3台消费者,那么此时,另外2台消费者就会闲着不干活。有没有可能能够既保证消费消息的顺序性,又能够提升性能呢?

答案就是kafka的动态负载均衡

前面提到了,producer会根据消息的key的哈希取模的结果来把消息分配到某个partition,也就是说同一个key的消息,只存在于一个partition中。而且消费者拉取消息,一个批次,只从一个partition中拉取消息

假设我们现在有一个topic,有2个partition。那么我们可不可以在组内3台消费者中,挑2台出来,各自对应这个topic的2个partition,这样消费者和partition一一对应。既能保证消息的顺序性,又能够提升性能。这就是kafka的动态负载均衡。

代码如下:

    //动态负载均衡@Testpublic void test3(){//基础配置Properties properties = new Properties();properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.184.129:9092,192.168.184.130:9092,192.168.184.131:9092");properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());//消费的细节String group = "user-center";properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG,group);properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");//自动提交时异步提交,丢数据&&重复数据properties.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");properties.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"15000");properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"10");KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);//kafka 的consumer会动态负载均衡consumer.subscribe(Arrays.asList(topic), new ConsumerRebalanceListener() {//Revoked,取消的回调函数@Overridepublic void onPartitionsRevoked(Collection<TopicPartition> partitions) {System.out.println("---onPartitionsRevoked:");Iterator<TopicPartition> iter = partitions.iterator();while(iter.hasNext()){System.out.println(iter.next().partition());}System.out.println();}//Assigned 指定的回调函数@Overridepublic void onPartitionsAssigned(Collection<TopicPartition> partitions) {System.out.println("---onPartitionsAssigned:");Iterator<TopicPartition> iter = partitions.iterator();while(iter.hasNext()){System.out.println(iter.next().partition());}System.out.println();}});while(true){ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(0));if (!records.isEmpty()){System.out.println();System.out.println("-----------------" + records.count() + "------------------------------");Iterator<ConsumerRecord<String,String>> iterator = records.iterator();while (iterator.hasNext()){ConsumerRecord<String,String> next = iterator.next();int p = next.partition();long offset = next.offset();String key = next.key();String value = next.value();System.out.println("key: " + key + " val: " + value + " partition: " + p + " offset: " + offset);TopicPartition sp = new TopicPartition(topic,p);OffsetAndMetadata om = new OffsetAndMetadata(offset);HashMap<TopicPartition, OffsetAndMetadata> map = new HashMap<>();map.put(sp,om);consumer.commitSync(map);}}}}

上述代码在订阅时,加了一个ConsumerRebalanceListener监听器,实现了2个回调函数onPartitionsRevoked和onPartitionsAssigned,分别是取消组内消费者负载均衡时触发的回调函数,和指定组内消费者加入负载均衡时触发的回调函数。

在使用动态负载均衡时,需要注意的是,在提交时不要批量提交,否则会报错如下,暂时还没有研究问题原因,有了结果会回来更新的。

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.

先打开一个消费者A,触发了回调函数onPartitionsAssigned,可以看到partition0 和 partition1都被分配到了A上。在这里插入图片描述

此时打开生产者,可以看到partition0和1的消息都发送到了A上。
在这里插入图片描述

我们再打开一个同一个组内的消费者B。
可以看到A取消了partition0和1的分配,被指定了partition0。消费者B则被指定了partition1.在这里插入图片描述在这里插入图片描述

再次打开生产者去生产消息,这次A只消费partition 0的消息,B只消费partition1的消息。
在这里插入图片描述
在这里插入图片描述

如果我们再启动组内第3台消费者,那么组内消费者会再次负载均衡。由于这个topic只有2个partition,所以即使启动3台组内的消费者,也最多只有2个消费者被分配给某个partition,剩余1个消费者不参与负载均衡
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

参考文章:
[1],【kafka】记一次kafka基于linux的原生命令的使用

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/166796.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

本地生活餐饮视频怎么拍摄能有更多流量?如何批量生产呢?

本地生活近几年特别的火&#xff0c;所以到现在各类内容雷同性也比较高&#xff0c;视频缺少新的创意和玩法&#xff0c;像餐饮店的视频&#xff0c;大部分都是拍顾客进门、拍餐饮店座无虚席的实景……作为用户&#xff0c;其实早就已经看腻了。 今天推荐本地生活餐饮店商家拍…

前端缓存机制——强缓存、弱缓存、启发式缓存

强缓存和弱缓存的主要区别是主要区别在于缓存头携带的信息不同。 强缓存&#xff1a; 浏览器发起请求&#xff0c;查询浏览器的本地缓存&#xff0c;如果找到资源&#xff0c;则直接在浏览器中使用该资源。若是未找到&#xff0c;或者资源已过期&#xff0c;则浏览器缓存返回未…

5G-A 商用加速,赋能工业互联网

2019 年 6 月&#xff0c;中国工业和信息化部发放 5G 商用牌照。同年 10 月&#xff0c;三大运营商公布 5G 商用套餐&#xff0c;11 月 1 日正式上线 5G 商用套餐&#xff0c;标志中国正式进入 5G 商用新纪元。今年是 5G 商用的第五年&#xff0c;在当前数字经济蓬勃发展的催化…

【springboot配置项动态刷新】与【yaml文件转换为java对象】

文章目录 一&#xff0c;序言二&#xff0c;准备工作1. pom.xml引入组件2. 配置文件示例 三&#xff0c;自定义配置项动态刷新编码实现1. 定义自定义配置项对象2. 添加注解实现启动时自动注入3. 实现yml文件监听以及文件变化处理 四&#xff0c;yaml文件转换为java对象1. 无法使…

js实现向上、向下、向左、向右无缝滚动

向左滚动 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" content"IEedge"><meta name"viewport" content"widthdevice-width, ini…

【MySQL习题】各个视频的平均完播率【全网最详细教学】

目录 数据表描述 问题描述 输出示例 解题思路【重点】 正解代码 数据表描述 有以下两张表&#xff1a; 表1&#xff1a;用户-视频互动表tb_user_video_log 数据举例&#xff1a; 说明&#xff1a; uid-用户ID,video_id-视频ID start_time-开始观看时间end_time-结束观…

【图像分类】【深度学习】【Pytorch版本】GoogLeNet(InceptionV1)模型算法详解

【图像分类】【深度学习】【Pytorch版本】GoogLeNet(InceptionV1)模型算法详解 文章目录 【图像分类】【深度学习】【Pytorch版本】GoogLeNet(InceptionV1)模型算法详解前言GoogLeNet讲解Inception结构InceptionV1结构1x1卷积的作用辅助分类器 GoogLeNet模型结构GoogLeNet Pyto…

SPSS曲线回归

前言&#xff1a; 本专栏参考教材为《SPSS22.0从入门到精通》&#xff0c;由于软件版本原因&#xff0c;部分内容有所改变&#xff0c;为适应软件版本的变化&#xff0c;特此创作此专栏便于大家学习。本专栏使用软件为&#xff1a;SPSS25.0 本专栏所有的数据文件请点击此链接下…

Java2 - 数据结构

5 数据类型 5.1 整数类型 在Java中&#xff0c;数据类型用于定义变量或表达式可以存储的数据的类型。Java的数据类型可分为两大类&#xff1a;基本数据类型和引用数据类型。 byte&#xff0c;字节 【1字节】表示范围&#xff1a;-128 ~ 127 即&#xff1a;-2^7 ~ 2^7 -1 sho…

三菱FX3U系列-定位指令

目录 一、简介 二、指令形式 1、相对定位[DRVI、DDRVI] 2、绝对定位[DRVA、DDRVA] 三、总结 一、简介 定位指令用于控制伺服电机或步进电机的位置移动。可以通过改变脉冲频率和脉冲数量来控制电机的移动速度和移动距离&#xff0c;同时还可以指定移动的方向。 二、指令形…

UE5、CesiumForUnreal实现加载GeoJson绘制单面(Polygon)功能(StaticMesh方式)

文章目录 1.实现目标2.实现过程2.1 实现原理2.1.1 数据读取2.1.2 三角剖分2.1.3 创建StaticMesh2.2 应用测试2.2.1 具体代码2.2.2 蓝图应用测试3.参考资料1.实现目标 通过读取本地GeoJson数据,在UE中以StaticMeshComponent的形式绘制出面数据,支持Editor和Runtime环境,GIF动…

kafka和rocketMq的区别

kafka topic 中每一个分区会有 Leader 与 Follow。Kafka 的内部机制可以保证 topic 某一个分区的 Leader 与 Follow 不在同一台机器上 Leader 节点承担一个分区的读写&#xff0c;Follow 节点只负责数据备份 如果 Leader 分区所在的 Broker 节点宕机&#xff0c;会触发主从节…