六、consumer源码解读

Consumer源码解读

本课程的核心技术点如下:

1、consumer初始化
2、如何选举Consumer Leader
3、Consumer Leader是如何制定分区方案

4、Consumer如何拉取数据
5、Consumer的自动偏移量提交

Consumer初始化

image.png

从KafkaConsumer的构造方法出发,我们跟踪到核心实现方法

image.png

这个方法的前面代码部分都是一些配置,我们分析源码要抓核心,我把核心代码给摘出来

NetworkClient

Consumer与Broker的核心通讯组件

image.png

ConsumerCoordinator

协调器,在Kafka消费中是组消费,协调器在具体进行消费之前要做很多的组织协调工作。

image.png

Fetcher

提取器,因为Kafka消费是拉数据的,所以这个Fetcher就是拉取数据的核心类

image.png

而在这个核心类中,我们发现有很多很多的参数设置,这些就跟我们平时进行消费的时候配置有关系了,这里我们挑一些核心重点参数来讲一讲

fetch.min.bytes

每次fetch请求时,server应该返回的最小字节数。如果没有足够的数据返回,请求会等待,直到足够的数据才会返回。缺省为1个字节。多消费者下,可以设大这个值,以降低broker的工作负载。

fetch.max.bytes

每次fetch请求时,server应该返回的最大字节数。这个参数决定了可以成功消费到的最大数据。

比如这个参数设置的是50M,那么consumer能成功消费50M以下的数据,但是最终会卡在消费大于10M的数据上无限重试。fetch.max.bytes一定要设置到大于等于最大单条数据的大小才行。

默认是50M

image.png

fetch.wait.max.ms

如果没有足够的数据能够满足fetch.min.bytes,则此项配置是指在应答fetch请求之前,server会阻塞的最大时间。缺省为500个毫秒。和上面的fetch.min.bytes结合起来,要么满足数据的大小,要么满足时间,就看哪个条件先满足。

这里说一下参数的默认值如何去找:

image.png

image.png

max.partition.fetch.bytes

指定了服务器从每个分区里返回给消费者的最大字节数,默认1MB。

假设一个主题有20个分区和5个消费者,那么每个消费者至少要有4MB的可用内存来接收记录,而且一旦有消费者崩溃,这个内存还需更大。注意,这个参数要比服务器的message.max.bytes更大,否则消费者可能无法读取消息。

备注:1、Kafka入门笔记

image.png

max.poll.records

控制每次poll方法返回的最大记录数量。

默认是500

image.png

如何选举Consumer Leader

回顾之前的内容

image.png

那么如何完成以上的逻辑的,我们跟踪代码:

image.png

1、消费者协调器与组协调器的通讯

image.png

image.png

image.png

image.png

image.png

image.png

image.png

image.png

对Broker的响应进行处理

image.png

image.png

1、消费者协调器发起入组请求

image.png

image.png

image.png

image.png

image.png

Consumer Leader如何制定分区方案

回顾之前的内容

image.png

消费者分区策略

消费者参数

partition.assignment.strategy

分区分配给消费者的策略。默认为Range。允许自定义策略。

Range

把主题的连续分区分配给消费者。(如果分区数量无法被消费者整除、第一个消费者会分到更多分区)

RoundRobin

把主题的分区循环分配给消费者。

image.png

StickyAssignor

初始分区和RoundRobin是一样

粘性分区:每一次分配变更相对上一次分配做最少的变动.

目标:

1、分区的分配尽量的均衡

2、每一次重分配的结果尽量与上一次分配结果保持一致

当这两个目标发生冲突时,优先保证第一个目标

比如有3个消费者(C0、C1、C2)、4个topic(T0、T1、T2、T34),每个topic有2个分区(P1、P2)

image.png

C0: T0P0、T1P1、T3P0

C1: T0P1、T2P0、T3P1

C2: T1P0、T2P1

如果C1下线 、如果按照RoundRobin

image.png

C0: T0P0、T1P0、T2P0、T3P0

C2: T0P1、T1P1、T2P1、T3P1

对比之前

image.png

如果C1下线 、如果按照StickyAssignor

image.png

C0: T0P0、T1P1、T2P0、T3P0

C2: T0P1、T1P0、T2P1、T3P1

对比之前

image.png

image.png

自定义策略

extends 类AbstractPartitionAssignor,然后在消费者端增加参数:

properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,类.class.getName());

即可。

消费者分区策略源码分析

接着上个章节的代码。

image.png

image.png

image.png

image.png

image.png

Consumer拉取数据

这里就是拉取数据,核心Fetch类

image.png

image.png

image.png

自动提交偏移量

image.png

image.png

image.png

image.png

image.png

image.png

image.png

image.png

image.png

当然,自动提交auto.commit.interval.ms

image.png

默认5s

image.png

从源码上也可以看出

maybeAutoCommitOffsetsAsync 最后这个就是poll的时候会自动提交,而且没到auto.commit.interval.ms间隔时间也不会提交,如果没到下次自动提交的时间也不会提交。

这个autoCommitIntervalMs就是auto.commit.interval.ms设置的

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/18670.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

设计模式——迭代器模式

迭代器模式(落没的设计模式) 定义 它提供一种方法访问一个容器对象中各个元素,而又不需要暴露该对象的内部细节。 迭代器是为容器服务的,能容纳元素的对象可以称为容器,例:List、Set、Map 迭代器模式&a…

操作系统实战45讲|01程序的运行过程、02几行汇编几行C

配置环境遇到的问题 virtualBox实现windows和Ubuntu之间的复制粘贴 1.修改设置,设置共享粘贴板为双向 或者在运行的虚拟机的窗口中进行设置 2.安装virtualbox增强功能 (1)直接联网安装 设备----安装增强功能 3.补充 windows中终端和文档复制的快捷键为ctrlc&…

Element Plus el-table 自定义合并行和列

原文链接&#xff1a;Element Plus el-table 自定义合并行和列 前言 目标效果是将表格行数据中某个属性值相同的项合并到一起&#xff0c;效果如下&#xff1a; <el-table :data"tableData" :span-method"spanMethod" style"width: 100%">…

Set up the compilation environment for ESP8266-RTOS-SDK using Git on Windows

Software to be installed&#xff1a; Git BashPython Environment&#xff0c;ESP8266 Master require Python v3 environment. Please check “add to PATH” while installing Open Git Bash&#xff0c;Enter a local disk&#xff0c;create an empty folder, enter the …

JVM常用参数

以下是 JVM 常用参数的配置 内存相关参数&#xff1a; -Xmx&#xff1a;指定 JVM 最大可用内存&#xff0c;例如 -Xmx2g 表示最大可用内存为 2GB。 -Xms&#xff1a;指定 JVM 初始内存大小&#xff0c;例如 -Xms512m 表示初始内存为 512MB。 -XX:MaxPermSize&#xff1a;指定…

Leap AI + Python 开发绘图应用

使用python语言&#xff0c;并借助Leap AI网站的api key&#xff0c;可以轻松实现AI绘图功能。使用时&#xff0c;用户只要输入prompt提示词&#xff0c;几秒钟之内服务器就能生成图片并返回图片的链接地址。开发人员可以利用这个功能开发个性化的绘图软件&#xff0c;或者整合…

Redis常见命令和使用示例

目录 1.使用官方文档学习redis 2.核心命令 SET GET 3.全局/通用命令 KEYS EXISTS DEL EXPIRE TTL TYPE 1.使用官方文档学习redis redis官网 点击搜索&#xff0c;输入&#xff0c;比如输入ping&#xff0c;会显示Commands,是一个命令&#xff0c;点击ping&#xff…

「深度学习之优化算法」(十二)水波算法

1. 水波算法简介 (以下描述,均不是学术用语,仅供大家快乐的阅读)   水波算法(Water wave optimization)是根据水波理论提出的优化算法。什么是水波理论?简单来说就是水波的宽度越小,其频率越高,频率与水波宽度的平方根成反比(具体细节我也不懂,物理方面的)。水波…

Spring AOP

Spring AOP &#x1f50e;定义&#x1f50e;AOP 的组成切面(Aspect)切点(Pointcut)通知(Advice)连接点(Join Point)总结 &#x1f50e;Spring AOPSpring AOP 的创建Spring AOP 的使用切点表达式Spring AOP 的实现原理 &#x1f50e;&#x1f338;&#x1f338;&#x1f338;完结…

报名开启 | DolphinDB 粉丝节,与你相约上海

作为量化爱好者&#xff0c;你是否在寻找更多志同道合的朋友&#xff1f; 作为技术达人&#xff0c;想探索因子挖掘、深度学习、AI领域的前沿技术&#xff1f; 7月22日 机会来了&#xff01; DolphinDB 首届线下粉丝节将于7月22日下午在上海举行&#xff01; 来现场&#xf…

面向对象进阶一(static,继承,多态)

面向对象进阶一 一、static二、继承2.1 继承的定义和特点2.2 继承内容、成员变量和成员方法的访问特点2.2.1继承内容2.2.2 成员变量的访问特点2.2.3 成员方法的访问方法、方法的重写 2.3 继承中构造方法的访问特点 三、this、super使用总结四、多态4.1 多态的基本概念4.2 多态调…