Kafka入门,分区的分配再平衡(二十)

分区的分配以及再平衡

在这里插入图片描述

1、kafka有四种主流的分区策略:Range,RoundRobin,Sticky,CooperativeSticky。可以通过配置参数partition.assignment.strategy,修改分区的分配策略。默认策略是Ranage+CooperativeSticky。Kafka可以同事使用多个分区分配策略。

参数描述
heartbeat.interval.msKafka消费者和coordinator之间的心跳时间,默认3s。该条目的值必须小于session.timeout.ms,也不应该高于session.timeoyt.ms的1/3
session.timeout.msKafka消费者和coordinator之间连接超时时间,默认45s。超过该值,该消费者被移除,消费者执行再平衡
max.poll.interval.ms消费者处理消息的最大时长,默认是5分钟,超过该值被移除,消费者执行再平衡
partition.assignment.strategy消费者分区分配策略,默认策略是Range+CooperativeStickt。Kafka可以同事使用多个分区分配策略。可以选择策略包括:Range,RoundRobin,sticky,CooperativeSticky

Range以及再平衡

在这里插入图片描述

Range分区策略原理
Range是对每个topic而言
首先对同一个topic里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。
加入现在有7个分区,3个消费者,排序后的分区将会是0,1,2,3,4,5,6消费者排序完之后将会是C0,C1,C2
通过partitions数/consumer数来决定每个消费者应该消费几个分区,如果除不尽,那么前面几个消费者将会多消费一个分区。
例如。7/3=2余1,除不尽,那么消费者C0便会多消费者1个分区。8/3=2余2,除不尽,那么C0和C1分别多消费一个。
注意:如果只是针对一个topic而言,C0消费者多一个分区影响不是很大,但是如果有N个topic,那么针对每个topic,消费者C0都将多消费一个分区,topic越多,C0消费的分区会比其他消费者明显多消费N个分区
容易尝试数据倾斜
测试代码

package com.longer.range;import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;import java.util.Properties;/*** 测试指定分区(partition)*/
public class Producer {public static void main(String[] args) throws InterruptedException {//1、创建kafka生产者得配置对象Properties properties=new Properties();//2、给kafka配置对象添加配置信息properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092");//3、key value 序列化properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());//4、创建kafka生产者对象KafkaProducer<String,String> producer=new KafkaProducer<String, String>(properties);for (int i = 0; i < 500; i++) {//指定数据发送到1号分区,key为空(IDEA中,ctrl+p查看参数)producer.send(new ProducerRecord<>("two", "longer " + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception e) {if(e==null){System.out.println(String.format("主题:%s,分区:%s",metadata.topic(),metadata.partition()));return;}e.printStackTrace();}});Thread.sleep(1000);}//关闭资源producer.close();}
}
package com.longer.range;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class CustomConsumer1 {public static void main(String[] args) {//创建消费者的配置对象Properties properties=new Properties();//2、给消费者配置对象添加参数properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092");//配置序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//配置消费者组(组名任意起名)必须properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");//创建消费者对象KafkaConsumer<String,String> kafkaConsumer=new KafkaConsumer<String, String>(properties);//注册要消费的主题ArrayList<String> topics=new ArrayList<>();topics.add("two");kafkaConsumer.subscribe(topics);while (true){//设置1s中消费一批数据ConsumerRecords<String,String> consumerRecords=kafkaConsumer.poll(Duration.ofSeconds(1));//打印消费到的数据for(ConsumerRecord<String,String> record:consumerRecords){System.out.println(record);}}}
}

用一个消费者每一秒发送一条信息,三个消费者接收。观察打印情况。再停止其中一个消费者,再观察情况。
(1)停止掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)。
1 号消费者:消费到 3、4 号分区数据。
2 号消费者:消费到 5、6 号分区数据。
0 号消费者的任务会整体被分配到 1 号消费者或者 2 号消费者。
说明:0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需
要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。
(2)再次重新发送消息观看结果(45s 以后)。
1 号消费者:消费到 0、1、2、3 号分区数据。
2 号消费者:消费到 4、5、6 号分区数据。
说明:消费者 0 已经被踢出消费者组,所以重新按照 range 方式分配。

RoundRobin 以及再平衡

在这里插入图片描述
RoundRobin针对集群所有Topic而言
RoundRobin沦陷分区策略,是把所有的partition和所有的consumer都列出来,然后按照hashcode而进行排序,最后通过沦陷算法来分配partition给各个消费者。
修改分区策略

properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RoundRobinAssignor");

测试代码

public class CustomConsumer1 {public static void main(String[] args) {//创建消费者的配置对象Properties properties=new Properties();//2、给消费者配置对象添加参数properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092");//配置序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//配置消费者组(组名任意起名)必须properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");//修改分区策略properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RoundRobinAssignor");//创建消费者对象KafkaConsumer<String,String> kafkaConsumer=new KafkaConsumer<String, String>(properties);//注册要消费的主题ArrayList<String> topics=new ArrayList<>();topics.add("two");kafkaConsumer.subscribe(topics);while (true){//设置1s中消费一批数据ConsumerRecords<String,String> consumerRecords=kafkaConsumer.poll(Duration.ofSeconds(1));//打印消费到的数据for(ConsumerRecord<String,String> record:consumerRecords){System.out.println(record);}}}
}

(1)停止掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)。
1 号消费者:消费到 2、5 号分区数据
2 号消费者:消费到 4、1 号分区数据
0 号消费者的任务会按照 RoundRobin 的方式,把数据轮询分成 0 、6 和 3 号分区数据,
分别由 1 号消费者或者 2 号消费者消费。
说明:0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需
要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。
(2)再次重新发送消息观看结果(45s 以后)。
1 号消费者:消费到 0、2、4、6 号分区数据
2 号消费者:消费到 1、3、5 号分区数据
说明:消费者 0 已经被踢出消费者组,所以重新按照 RoundRobin 方式分配。

Sticky 以及再平衡

粘性分区定义:可以理解为分配的结果带有”粘性的“,即再执行一次新的分配之前,考虑上一次分配的结果,尽量少的调整分配变动,可以节省大量的开销。
粘性分区时Kafka从0.11.x版本开始引入这种分配策略,首先会尽量保持原有分配的分区不变化
测试代码

package com.longer.sticky;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class CustomConsumer1 {public static void main(String[] args) {//创建消费者的配置对象Properties properties=new Properties();//2、给消费者配置对象添加参数properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop100:9092");//配置序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//配置消费者组(组名任意起名)必须properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");//修改分区策略properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.StickyAssignor");//创建消费者对象KafkaConsumer<String,String> kafkaConsumer=new KafkaConsumer<String, String>(properties);//注册要消费的主题ArrayList<String> topics=new ArrayList<>();topics.add("two");kafkaConsumer.subscribe(topics);while (true){//设置1s中消费一批数据ConsumerRecords<String,String> consumerRecords=kafkaConsumer.poll(Duration.ofSeconds(1));//打印消费到的数据for(ConsumerRecord<String,String> record:consumerRecords){System.out.println(record);}}}
}

(1)停止掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)。
1 号消费者:消费到 2、5、3 号分区数据。
2 号消费者:消费到 4、6 号分区数据。
0 号消费者的任务会按照粘性规则,尽可能均衡的随机分成 0 和 1 号分区数据,分别
由 1 号消费者或者 2 号消费者消费。
说明:0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需
要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。
(2)再次重新发送消息观看结果(45s 以后)。
1 号消费者:消费到 2、3、5 号分区数据。
2 号消费者:消费到 0、1、4、6 号分区数据。
说明:消费者 0 已经被踢出消费者组,所以重新按照粘性方式分配

总结

range会造成数据倾斜,RoundRobin不会造成,但是分区调整不会考虑最小变动。sticky,尽量少的调整分配变动,可以节省大量的开销。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/18214.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Pycharm设置Python文件页眉默认信息

次点击File->Settings->Editor->File and Code Templates->然后选择Python script. 后将下列代码复制到右边的框框中&#xff0c;然后选择apply应用&#xff0c;就可以啦 ##!/usr/bin/python3 # -*- coding: utf-8 -*- # Time : ${DATE} ${TIME} # Author : 作者…

【Django】列表页面的搜索功能

目的 页面列表增加多字段搜索显示查询结果 方案 分页显示搜索结果 效果 实现 列表页面 # list.html <div class"pull-left" style"margin-bottom: 10px"><form action"{% url api-search %}" method"get"><div …

MQ集群搭建

1.⾸先&#xff0c;克隆⼀台IP地址为192.168.230.132的虚拟机&#xff0c;然后参考章节的内 容&#xff0c;在该主机上安装RabbitMQ等环境。再加上之前的虚拟机&#xff0c;这样我们就准备好了两台Linux服 务器。 修改/etc/hosts映射⽂件。 vim /etc/hosts 3.两台Linux主机修改…

微软开源社区上线,能够给微软Win95等“上古系统”打补丁

日前一个基于社区的项目“Windows Update Restored”上线&#xff0c;据了解该项目的目的是为老系统重新提供对Windows Update的支持&#xff0c;可为 Windows 95 / NT 4.0/98(包括 SE)/ME/ 2000 SP2 等“上古时期”的微软操作系统提供升级补丁、修复 bug 或安全漏洞。 据悉&a…

k8s 就绪探针

【k8s 系列】k8s 学习二十&#xff0c;就绪探针 提起探针&#xff0c;不知兄dei 们是否有印象&#xff0c;之前我们分享过存活探针&#xff0c;分享存活探针是如何确保异常容器自动重启来保持应用程序的正常运行&#xff0c;感兴趣的可以查看文章 k8s 系列k8s 学习十七&#x…

masm32 链接问题

当我参考资料学习Windows环境下32位汇编时&#xff0c;编译倒没什么问题&#xff0c;可是链接就不对了。 网上也没找到解决办法&#xff0c;经过后来思考了一下&#xff0c;会不会是link.exe这个玩意有问题&#xff1f;比如有多个&#xff1f; 所以使用link的绝对路径试了一下…

【Hello mysql】 mysql的基本查询(二)

Mysql专栏&#xff1a;Mysql 本篇博客简介&#xff1a;介绍mysql的基本查询 mysql的基本查询&#xff08;二&#xff09; 将筛选出来的数据插入到数据库中&#xff08;insertselect&#xff09;聚合函数统计班级共有多少同学统计班级手机的qq号有多少统计本次考试去重的数学成绩…

【Linux】设置 命令 --help 帮助文件为中文

&#x1f341;博主简介 &#x1f3c5;云计算领域优质创作者   &#x1f3c5;华为云开发者社区专家博主   &#x1f3c5;阿里云开发者社区专家博主 &#x1f48a;交流社区&#xff1a;运维交流社区 欢迎大家的加入&#xff01; 文章目录 前言设置系统默认语言为中文安装man-…

Redis — 不仅仅是缓存

1*qIy3PMmEWNcD9Czh_21C8g.png Redis是一种快速、开源的内存键值&#xff08;NoSQL&#xff09;数据库&#xff0c;远远超越了缓存的功能。Redis使用RAM进行操作&#xff0c;提供亚毫秒级的响应时间&#xff0c;支持每秒数百万次请求。Redis主要用于缓存&#xff0c;但它也可以…

设置和使用DragGAN:搭建非官方的演示版

ragGAN的官方版还没有发布&#xff0c;但是已经有非官方版的实现了&#xff0c;我们看看如何使用。DragGAN不仅让GAN重新回到竞争轨道上&#xff0c;而且为GAN图像处理开辟了新的可能性。正式版本将于本月发布。但是现在已经可以在一个非官方的演示中试用这个新工具了 DragGAN …

GitKraken 6.5.1免费中文版安装

今天发现SmartGit上传不了代码了, 看了一下过期了, 我不想花钱买, 就找个替代工具, 方便写代码, 方便合并代码, 方便点击提交代码, 免得敲命令浪费时间. 安装 6.5.1 版本 下载版本,已上传到 CSDN :GitKraken 6.5.1免费中文版安装更换快捷方式 C:\Users\kentrl\AppData\Local\…

Linux进程信号(一)

信号产生 1.信号基础知识2.初步认识信号3.signal函数4.技术应用角度的信号5.调用系统函数向进程发信号6.由软件条件产生的信号7.硬件异常产生信号8.core &#x1f31f;&#x1f31f;hello&#xff0c;各位读者大大们你们好呀&#x1f31f;&#x1f31f; &#x1f680;&#x1f…