Kafka基础知识-消费者

news/2025/2/12 16:35:50/文章来源:https://www.cnblogs.com/json-92/p/18711773

一、消费方式

consumer采用pull(拉)的模式从broker中读取数据。

push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由broker决定的。它的目标是尽可能以最快的速度传递消息,但是这样容易造成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式可以根据consumer的消费能力以适当的速率消费消息。

pull模式的不足之处是,如果kafka没有数据,消费者可能会陷入循环中,一直返回空数据。针对这一点,kafka的消费者在消费数据时会传入一个时长参数timeout,如果当前没有数据可消费,consumer会等待一段时间后再返回。

 

二、分区分配策略

一个consumer group中有多个consumer,一个topic有多个partition,所以必然会涉及到partition的分配问题,即确定哪个partition由哪个consumer来消费。Kafka提供了3种消费者分区分配策略:RangeAssigor、RoundRobinAssignor、StickyAssignor。

PartitionAssignor接口用于用户定义实现分区分配算法,以实现Consumer之间的分区分配。消费组的成员订阅它们感兴趣的Topic并将这种订阅关系传递给作为订阅组协调者的Broker。协调者选择其中的一个消费者来执行这个消费组的分区分配并将分配结果转发给消费组内所有的消费者。Kafka默认采用RangeAssignor的分配算法。

2.1 RangeAssignor

RangeAssignor对每个Topic进行独立的分区分配。对于每一个Topic,首先对分区按照分区ID进行排序,然后订阅这个Topic的消费组的消费者再进行排序,之后尽量均衡的将分区分配给消费者。这里只能是尽量均衡,因为分区数可能无法被消费者数量整除,那么有一些消费者就会多分配到一些分区。分配示意图如下:

分区分配的算法如下:

@Override
public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsPerTopic,Map<String, Subscription> subscriptions) {Map<String, List<String>> consumersPerTopic = consumersPerTopic(subscriptions);Map<String, List<TopicPartition>> assignment = new HashMap<>();for (String memberId : subscriptions.keySet())assignment.put(memberId, new ArrayList<TopicPartition>());//for循环对订阅的多个topic分别进行处理for (Map.Entry<String, List<String>> topicEntry : consumersPerTopic.entrySet()) {String topic = topicEntry.getKey();List<String> consumersForTopic = topicEntry.getValue();Integer numPartitionsForTopic = partitionsPerTopic.get(topic);if (numPartitionsForTopic == null)continue;//对消费者进行排序Collections.sort(consumersForTopic);//计算平均每个消费者分配的分区数int numPartitionsPerConsumer = numPartitionsForTopic / consumersForTopic.size();//计算平均分配后多出的分区数int consumersWithExtraPartition = numPartitionsForTopic % consumersForTopic.size();List<TopicPartition> partitions = AbstractPartitionAssignor.partitions(topic, numPartitionsForTopic);for (int i = 0, n = consumersForTopic.size(); i < n; i++) {//计算第i个消费者,分配分区的起始位置int start = numPartitionsPerConsumer * i + Math.min(i, consumersWithExtraPartition);//计算第i个消费者,分配到的分区数量int length = numPartitionsPerConsumer + (i + 1 > consumersWithExtraPartition ? 0 : 1);assignment.get(consumersForTopic.get(i)).addAll(partitions.subList(start, start + length));}}return assignment;
}

  

这种分配方式明显的一个问题是随着消费者订阅的Topic的数量的增加,不均衡的问题会越来越严重,比如上图中4个分区3个消费者的场景,C0会多分配一个分区。如果此时再订阅一个分区数为4的Topic,那么C0又会比C1、C2多分配一个分区,这样C0总共就比C1、C2多分配两个分区了,而且随着Topic的增加,这个情况会越来越严重。分配结果:

订阅2个Topic,每个Topic4个分区,共3个Consumer

  • C0:[T0P0,T0P1,T1P0,T1P1]
  • C1:[T0P2,T1P2]
  • C2:[T0P3,T1P3]

2.2 RoundRobinAssignor

RoundRobinAssignor的分配策略是将消费组内订阅的所有Topic的分区及所有消费者进行排序后尽量均衡的分配(RangeAssignor是针对单个Topic的分区进行排序分配的)。如果消费组内,消费者订阅的Topic列表是相同的(每个消费者都订阅了相同的Topic),那么分配结果是尽量均衡的(消费者之间分配到的分区数的差值不会超过1)。如果订阅的Topic列表是不同的,那么分配结果是不保证“尽量均衡”的,因为某些消费者不参与一些Topic的分配。

以上两个topic的情况,相比于之前RangeAssignor的分配策略,可以使分区分配的更均衡。不过考虑这种情况,假设有三个消费者分别为C0、C1、C2,有3个Topic T0、T1、T2,分别拥有1、2、3个分区,并且C0订阅T0,C1订阅T0和T1,C2订阅T0、T1、T2,那么RoundRobinAssignor的分配结果如下:

看上去分配已经尽量的保证均衡了,不过可以发现C2承担了4个分区的消费而C1订阅了T1,是不是把T1P1交给C1消费能更加的均衡呢?

2.3 StickyAssignor

StickyAssignor分区分配算法,目的是在执行一次新的分配时,能在上一次分配的结果的基础上,尽量少的调整分区分配的变动,节省因分区分配变化带来的开销。Sticky是“粘性的”,可以理解为分配结果是带“粘性的”——每一次分配变更相对上一次分配做最少的变动。其目标有两点:

  • 分区的分配尽量的均衡。
  • 每一次重分配的结果尽量与上一次分配结果保持一致。

当这两个目标发生冲突时,优先保证第一个目标。第一个目标是每个分配算法都尽量尝试去完成的,而第二个目标才真正体现出StickyAssignor特性的。

StickyAssignor算法比较复杂,下面举例来说明分配的效果(对比RoundRobinAssignor),前提条件:

  • 有4个Topic:T0、T1、T2、T3,每个Topic有2个分区。
  • 有3个Consumer:C0、C1、C2,所有Consumer都订阅了这4个分区。

上面红色的箭头代表的是有变动的分区分配,可以看出,StickyAssignor的分配策略,变动较小。

 

三.Offset

由于Consumer在消费过程中可能会出现断电宕机等故障,Consumer恢复后,需要从故障前的位置继续消费,所以Consumer需要实时记录自己消费到哪个位置,以便故障恢复后继续消费。Kafka0.9版本之前,Consumer默认将offset保存在zookeeper中,从0.9版本开始,Consumer默认将offset保存在Kafka一个内置的名字叫_consumeroffsets的topic中。默认是无法读取的,可以通过设置consumer.properties中的exclude.internal.topics=false来读取。

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/882623.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

记 Quartz 定时任务导致内存泄露(OOM)问题

公司的系统一直运行正常,但是某一天同时反馈采集系统出现了没有办法正常运行的问题。经过查看日志定位到后台报错 OOM[2025-02-04 18:10:44.497] [] [[34m INFO[0;39m] LocalDataSourceJobStore:968] Handling the first 1 triggers that missed their scheduled fire-time. M…

nginx 简单实践:正向代理、反向代理【nginx 实践系列之二】

本文为 nginx 简单实践系列文章之一,主要简单实践了两个内容:正向代理、反向代理的简单实现,仅供参考。〇、前言 本文为 nginx 简单实践系列文章之二,主要简单实践了两个内容:正向代理、反向代理,仅供参考。 关于 Nginx 基础,以及安装和配置详解,可以参考博主过往文章:…

KubeSphere 和 K8s 高可用集群离线部署全攻略

本文首发:运维有术,作者术哥。 今天,我们将一起探索如何在离线环境中部署 K8s v1.30.6 和 KubeSphere v4.1.2 高可用集群。对于离线环境的镜像仓库管理,官方推荐使用 Harbor 作为镜像仓库管理工具,它为企业级用户提供了高效、安全的镜像存储方案。而在本指南中,我们将以 …

《刚刚问世》系列初窥篇-Java+Playwright自动化测试-13- iframe操作-中篇(详细教程)

1.简介 按照计划今天就要用实际的例子进行iframe自动化测试。宏哥还是用之前找到的一个含有iframe的网页(QQ邮箱和163邮箱),别的邮箱宏哥就没有细看了,可能后期这两个邮箱页面优化升级,也就不能用了,但是现在还可以用。所以今天这一篇的主要内容就是用这两个网页的iframe…

炸裂:SpringAI内置DeepSeek啦!

好消息,Spring AI 最新快照版已经内置 DeepSeek 了,所以以后项目中对接 DeepSeek 就方便多了。但因为快照版会有很多 Bug,所以今天咱们就来看稳定版的 Spring AI 如何对接 DeepSeek 满血版。 SpringAI和DeepSeek介绍 Spring AI 是 Spring 生态系统中的一个重要项目,旨在将人…

openssl生成证书请求时报错:invalid extension string:v3_conf.c:140:name=subjectAltName,section=@alt_names

解决方法: 1. 使用自定义的openssl-ca.cnf openssl req -new -key client.key -out client.csr -subj "/C=cn/ST=nanjing/L=nanjing/O=zte/OU=zte/CN=localhost" -config ./openssl-ca.cnf2. 修改系统的/etc/pki/tls/openssl.cnf 注释掉[ v3_req ]段部分 # [ v3_req…

linux中文件操作相关命令和使用

linux中文件操作相关命令和使用 cat more/less head/tail vi/vim等相关命令 cat 1. 文件查看 cat 文件名 --全部内容显示 -- 好东西 可以用于配置文件查看过滤 cat 文件名 | grep 筛选内容 --内容筛选显示 2. 文件创建并输入初始化文本 -- 新建content.txt 文…

APIPark 新增 AI 大模型负载均衡,APIKey 资源池以及 AI Token 消耗统计等重磅功能!

开发者们好!APIPark V1.4 功能更新给大家带来「负载均衡」、「APIKey 资源池」以及「Token 消耗统计」等重要功能,看看是否能帮助到大家更好地使用 AI 大模型~ V1.4 版本说明新功能 [❤️新增] 新增支持 AI 模型负载均衡:同时接入多款大模型,当原定的 AI 服务商无法访问时,…

活动营销系统

一、整体架构图二、核心业务系统介绍 2.1.接入层统一异常处理逻辑2.2.邀请服务逻辑2.3.权益发放服务2.4.排行榜服务 2.4.1.榜单服务数据结构 数据结构分为两块:配置中心数据,因为排行榜没有后台配置平台,只能将配置数据放到配置中心,具备实时更改配置的能力 数据表,主要是排行榜…

004 条件渲染

1、v-ifv-if指令用于条件性地渲染一块内容。这块内容只会在指令的表达式返回true值的时候被渲染。<p v-if="flag">我是孙猴子</p>data(){return{flag:true}} 2、v-else你可以使用 v-else 指令来表示 v-if 的"else块"<p v-if="flag&quo…

Tinyfox 简易教程之:Hello World!

Tinyfox程序设计系列教程之入门篇一,什么是 Tinyfox: Tinyfox 是一款自带 HTTP 服务器的以 WebApi、WebSocket 及“动态HTML”为核心功能的超轻量级的独立性极强的高性能 Web 应用程序基础框架。 Tinyfox 简单易用,性能强劲,跨平台,既支持 Linux 也支持 Windows,既支持 x…

中电金信:更智能、更精准、更专业,中电金信AI产品全栈接入DeepSeek

短短两周的时间,AI届新星DeepSeek凭借低成本、高性能和开源策略,以“中国速度”席卷全网,逆袭成为全球用户量增速最快的AI模型。作为大模型领域的一匹黑马,DeepSeek为千行百业提供了AI解决方案的新选择。2025年开工首日,中电金信研究院便第一时间在产品平台、智能应用、智…