Kafka-消费者-Consumer Group Rebalance设计

在同一个Consumer Group中,同一个Topic的不同分区会分配给不同的消费者进行消费,那么为消费者分配分区的操作是在Kafka服务端完成的吗?分区是如何进行分配呢?下面来分析Rebalance操作的原理。

方案一

Kafka最开始的解决方案是通过ZooKeeper的Watcher实现的。

每个Consumer Group在ZooKeeper下都维护了一个“/consumers/[group_id]/ids”路径,在此路径下使用临时节点记录属于此Consumer Group的消费者的Id,由Consumer启动时创建。

还有两个与ids节点同级的节点,它们分别是:
owners节点,记录了分区与消费者的对应关系;
offsets节点,记录了此Consumer Group在某个分区上的消费位置。

每个Broker、Topic以及分区在ZooKeeper中也都对应一个路径,如下所示。

  • /brokers/ids/broker_id:记录了host、port以及分配在此Broker上的Topic的分区列表。
  • /brokers/topics/[topic_name]:记录了每个Partition的Leader、ISR等信息。
  • /brokers/topics/[topic_name]/partitions/[partition_num]/state:记录了当前Leader、选举epoch等信息
    路径图如图所示。

在这里插入图片描述
每个Consumer都分别在“/consumers/[group_id]/ids”和“brokers/ids”路径上注册一个Watcher。

当“/consumers/[group_id]/ids”路径的子节点发生变化时,表示ConsumerGroup中的消费者出现了变化;

当“/brokers/ids”路径的子节点发生变化时,表示Broker出现了增减。
这样,通过Watcher,每个消费者就可以监控Consumer Group和Kafka集群的状态了。

这个方案看上去不错,但是严重依赖于ZooKeeper集群,有两个比较严重的问题:

  • 羊群效应(Herd Effect):先解释一下什么是“羊群效应”,一个被Watch的ZooKeeper节点变化,导致大量的Watcher通知需要被发送给客户端,这将导致在通知期间其他操作延迟。

    一般出现这种情况的主要原因就是没有找到客户端真正的关注点,也算是滥用Watcher的一种场景。
    继续前面的分析,任何Broker或Consumer加入或退出,都会向其余所有的Consumer发送Watcher通知触发Rebalance,就出现了“羊群效应”。

  • 脑裂(Split Brain):每个Consumer都是通过ZooKeeper中保存的这些元数据判断Consumer Group状态、Broker的状态以及Rebalance结果的,由于ZooKeeper只保证“最终一致性”,不保证“Simultaneously Consistent Cross-Client Views”,不同Consumer在同一时刻可能连接到ZooKeeper集群中不同的服务器,看到的元数据就可能不一样,这就会造成不正确的Rebalance尝试。

方案二

由于上述两个原因,Kafka的后续版本对Rebalance操作进行了改进,也对消费者进行了重新设计。

其核心设计思想是:将全部的Consumer Group分成多个子集,每个Consumer Group子集在服务端对应一个GroupCoordinator对其进行管理,GroupCoordinator是KafkaServer中用于管理Consumer Group的组件,消费者不再依赖ZooKeeper,而只有GroupCoordinator在ZooKeeper上添加Watcher。

消费者在加入或退出Consumer Group时会修改ZooKeeper中保存的元数据,这点与上文描述的方案一类似,此时会触发GroupCoordinator设置的Watcher,通知GroupCoordinator开始Rebalance操作。

下面简述这个过程:

  1. 当前消费者准备加入某Consumer Group或是GroupCoordinator发生故障转移时,消费者并不知道GroupCoordinator的网络位置,消费者会向Kafka集群中的任一Broker发送ConsumerMetadataRequest,此请求中包含了其Consumer Group的Groupld,收到请求的Broker会返回ConsumerMetadataResponse作为响应,其中包含了管理此ConsumerGroup的GroupCoordinator的相关信息。

  2. 消费者根据ConsumerMetadataResponse中的GroupCoordinator信息,连接到GroupCoordinator并周期性地发送HeartbeatRequest,HeartbeatRequest的具体格式在后面会详细介绍。

    发送HeartbeatRequest的主要作用是为了告诉GroupCoordinator此消费者正常在线,GroupCoordinator会认为长时间未发送HeartbeatRequest的消费者已经下线,触发新一轮的Rebalance操作。

  3. 如果HeartbeatResponse中带有IllegalGeneration异常,说明GroupCoordinator发起了Rebalance操作,此时消费者发送JoinGroupRequest(具体格式在后面介绍)给GroupCoordinator,JoinGroupRequest的主要目的是为了通知GroupCoordinator,当前消费者要加入指定的Consumer Group。

    之后,GroupCoordinator会根据收到的JoinGroupRequest和ZooKeeper中的元数据完成对此Consumer Group的分区分配。

  4. GroupCoordinator会在分配完成后,将分配结果写入ZooKeeper保存,并通过JoinGroupResponse返回给消费者。消费者就可以根据JoinGroupResponse中分配的分区开始消费数据。

  5. 消费者成功成为Consumer Group的成员后,会周期性发送HeartbeatRequest。如果HeartbeatResponse包含IlegalGeneration异常,则执行步骤3。如果找不到对应的GroupCoordinator(HeartbeatResponse包含NotCoordinatorForGroup异常),则周期性地执行步骤1,直至成功。

这里只是简略地描述此方案的步骤,整个方案还是有点复杂的,其中比较严谨地描述了消费者和GroupCoordinator的状态图和各个阶段可能发生的故障以及故障转移处理,本文重点关注Consumer Group Rebalance方面。

上面这种方案虽然解决了“羊群效应”、“脑裂”问题,但是还是有两个问题:

  • 分区分配的操作是在服务端的GroupCoordinator中完成的,这就要求服务端实现Partition的分配策略。当要使用新的Partition分配策略时,就必须修改服务端的代码或配置,之后重启服务,这就显得比较麻烦。

  • 不同的Rebalance策略有不同的验证需求。当需要自定义分区分配策略和验证需求时,就会很麻烦。

方案三

为了解决上述问题,Kafka进行了重新设计,将分区分配的工作放到了消费者这一端进行处理,而Consumer Group管理的工作则依然由GroupCoordinator处理。

这就让不同的模块关注不同的业务,实现了业务的切分和解耦,这种思想在设计时很重要。

重新设计后的协议在上一版本的协议上进行了修改,将JoinGroupRequest的处理过程拆分成了两个阶段,分别是Join Group阶段和Synchronizing Group State阶段。

当消费者查找到管理当前Consumer Group的GroupCoordinator后,就会进入Join Group阶段,Consumer首先向GroupCoordinator发送JoinGroupRequest请求,其中包含消费者的相关信息;

服务端的GroupCoordinator收到JoinGroupRequest后会暂存消息,收集到全部消费者之后,根据JoinGroupRequest中的信息来确定Consumer Group中可用的消费者,从中选取一个消费者成为Group Leader,还会选取使用的分区分配策略,最后将这些信息封装成JoinGroupResponse返回给消费者。

虽然每个消费者都会收到JoinGroupResponse,但是只有Group Leader收到的JoinGroupResponse中封装了所有消费者的信息。

当消费者确定自己是Group Leader后,会根据消费者的信息以及选定的分区分配策略进行分区分配。

在Synchronizing Group State阶段,每个消费者会发送SyncGroupRequest到GroupCoordinator,但是只有Group Leader的SyncGroupRequest请求包含了分区的分配结果,GroupCoordinator根据Group Leader的分区分配结果,形成SyncGroupResponse返回给所有Consumer。

消费者收到SyncGroupResponse后进行解析,即可获取分配给自身的分区。

最后,我们来了解消费者的状态转移与各请求之间的关系,如图所示。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/411136.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

一条sql是如何运行的

在我们平时使用sql的时候,基本是基于黑盒的使用方式,在客户端输入一条sql语句,然后回显想要的数据,对于mysql server端内部如何运行的以及与存储引擎如何交互的不得而知。 通过下面一幅图,大致描述客户端和服务端交互…

利用IP应用场景API识别真实用户

引言 在当今数字化时代,随着互联网的普及和应用的广泛,验证用户身份的重要性变得越来越突出。在许多场景中,特别是在涉及安全性、用户体验以及个人隐私保护方面,确定用户的真实身份至关重要。而IP应用场景API则是一种强大的工具&…

FreeRTOS学习第7篇--周期性延迟和相对性延迟函数

目录 FreeRTOS学习第7篇--周期性延迟和相对性延迟函数时间延迟vTaskDelay函数原型vTaskDelayUntil函数原型PrintTask_Task任务相关代码片段实验现象本文中使用的测试工程 FreeRTOS学习第7篇–周期性延迟和相对性延迟函数 本文目标:学习与使用FreeRTOS中的延迟函数&…

Spring Boot - Application Events 的发布顺序_ApplicationFailedEvent

文章目录 Pre概述Code源码分析 Pre Spring Boot - Application Events 的发布顺序_ApplicationEnvironmentPreparedEvent 概述 Spring Boot 的广播机制是基于观察者模式实现的,它允许在 Spring 应用程序中发布和监听事件。这种机制的主要目的是为了实现解耦&#…

选择安全数据交换系统时 要考虑哪些因素?

安全数据交换系统是一种专门设计用于在不同的网络环境(如内部不同网络,内部网络和外部网络)之间安全传输数据的解决方案。它通常包括一系列的技术和流程,旨在确保数据在传输过程中的完整性、机密性和可用性。 安全数据交换系统可以…

Maven工程 — 继承与聚合 相关知识点详解

简介:这篇帖子主要讲解Maven工程中的继承与聚合的相关知识点,用简洁的语言和小编自己的理解,深入浅出的说明Maven工程的继承与聚合。 目录 1、继承 1.1 继承关系的实现 1.2 版本锁定 2、聚合 2.1 聚合方法 3、总结 1、继承 图 1-1 继承…

springBoot项目打包发布

打包 项目代码编写完成后&#xff0c;在pom.xml文件中引用打包的插件&#xff1a; <!-- 打包插件坐标--><build><plugins><!--打包插件--><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-mave…

vue 指定区域可拖拽的限定拖拽区域的div(如仅弹窗标题可拖拽的弹窗)

<template><div class"container" ref"container"><div class"drag-box" v-drag><div class"win_head">弹窗标题</div><div class"win_content">弹窗内容</div></div><…

探索设计模式的魅力:抽象工厂模式的艺术

抽象工厂模式&#xff08;Abstract Factory Pattern&#xff09;是一种创建型设计模式&#xff0c;用于在不指定具体类的情况下创建一系列相关或相互依赖的对象。它提供了一个接口&#xff0c;用于创建一系列“家族”或相关依赖对象&#xff0c;而无需指定它们的具体类。 主要参…

Jest 28发布

Jest 28终于来了&#xff0c;它带来了一些长期以来一直要求的特性&#xff0c;比如支持跨多台机器的测试运行分片、包导出和自定义假计时器行为的能力。 新特性 安装大小减少了大约 1/3 正如在去年的Jest 27博客中所宣布的那样&#xff0c;已经从默认安装中删除了一些不再默…

阿赵UE学习笔记——11、地形系统

阿赵UE学习笔记目录 大家好&#xff0c;我是阿赵。   继续学习虚幻引擎的用法&#xff0c;这次来学习一下虚幻引擎的地形系统的用法。 一、创建地形 在选项模式里面&#xff0c;选择地形&#xff1a; 进入到地形界面之后&#xff0c;需要先创建一个地形&#xff1a; 留意看…

stm32 - GPIO

stm32 - GPIO 基本结构输入输出 基本结构 所有GPIO都挂在APB2总线上 寄存器&#xff1a;内核通过APB2总线对寄存器进行读写&#xff0c;实现电平的读写 GPIO引脚的每一位对应寄存器中的某一位 GPIO中的驱动器是增加信号驱动能力的&#xff0c;用于增大驱动能力 输入 读取端口的…

吃瓜教程Task1:概览西瓜书+南瓜书第1、2章

由于本人之前已经学习过西瓜书&#xff0c;本次学习主要是对以往知识的查漏补缺&#xff0c;因此本博客记录了在学习西瓜书中容易混淆的点以及学习过程中的难点。更多学习内容可以参考下面的链接&#xff1a; 南瓜书的地址&#xff1a;https://github.com/datawhalechina/pumpk…

mobi文件怎么转换成pdf?

mobi文件怎么转换成pdf&#xff1f;在数字化时代&#xff0c;电子书籍成为了越来越受欢迎的阅读方式。我们可以通过多种格式的电子书来获取知识和娱乐&#xff0c;其中一种常见的格式就是Mobi文件。Mobi文件是亚马逊公司开发的一种电子书格式&#xff0c;它主要用于Kindle设备和…

C语言通过MSXML6.0读写XML文件(同时支持char[]和wchar_t[]字符数组)

开发环境&#xff1a;Visual Studio 2010 运行环境&#xff1a;Windows XP SP3 第一节 读取XML文件&#xff08;使用wchar_t[]字符数组&#xff09; /* 这个程序只能在C编译器下编译成功, 请确保源文件的扩展名为c */ #define COBJMACROS #include <stdio.h> #include …

Springboot 子工程构建完后无法找到springboot依赖

问题: 构建完子工程后无法找到SpringBootTest 解决方案: 最好用这个构建 https://www.cnblogs.com/he-wen/p/16735239.html 1.先观察项目目录 是否正确 2.观察子工程目录 3.看pom.xml中是否引用springboot依赖 4.检查代码 查看父项目是否包含子模块 查看子模块的父项目是否…

uniapp写微信小程序实现电子签名

写电子签名一定要注意的是一切全部按照手机上的适配来&#xff0c;为啥这么说呢&#xff0c;因为你在微信开发者工具中调试的时候认为是好的&#xff0c;正常的非常nice,当你发布版本的时候你会发现问题出来了。我下边的写法你可以直接用很简单。就是要记住canvas的几个属性和用…

基于SSM的校园闲置物品交易平台设计与实现

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SSM 前端&#xff1a;采用JSP技术开发 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目&#x…

教师语言的重要性体现在哪些方面?

教师&#xff0c;这个职业被誉为人类灵魂的工程师&#xff0c;他们的话语有着无可比拟的力量。有时&#xff0c;一句话就能点亮一个孩子的世界&#xff0c;也可以打破一个孩子的求知欲望。那么&#xff0c;教师话语的重要性究竟体现在哪些方面呢&#xff1f; 教师的话语是激发…

帮管家 CRM init 信息泄露漏洞

此文所提供的信息只为网络安全人员对自己所负责的网站、服务器等&#xff08;包括但不限于&#xff09;进行检测或维护参考&#xff0c;未经授权请勿利用文章中的技术资料对任何计算机系统进行入侵操作。利用此文所提供的信息而造成的直接或间接后果和损失&#xff0c;均由使用…