Kafka-消费者-KafkaConsumer分析-ConsumerNetworkClient

前面介绍过NetworkClient的实现,它依赖于KSelector、InFlightRequests、Metadata等组件,负责管理客户端与Kafka集群中各个Node节点之间的连接,通过KSelector法实现了发送请求的功能,并通过一系列handle*方法处理请求响应、超时请求以及断线重连。

ConsumerNetworkClient在NetworkClient之上进行了封装,提供了更高级的功能和更易用的API。

在图中展示了ConsumerNetworkClient的核心字段以及其依赖的组件。

在这里插入图片描述

  • client:NetworkClient对象。
  • delayedTasks:定时任务队列,DelayedTaskQueue是Kafka提供的定时任务队列的实现,其底层是使用JDK提供的PriorityQueue实现。
    简单介绍一下PriorityQueue,这是一个非线程安全的、无界的、优先级队列,实现原理是小顶堆,底层是基于数组实现的,其对应的线程安全实现是PriorityBlockingQueue,这个定时任务队列中是心跳任务。
  • metadata:用于管理Kafka集群元数据。
  • unsent:缓冲队列,Map<Node,List类型,key是Node节点,value是发往此Node的ClientRequest集合。
  • unsentExpiryMs:ClientRequest在unsent中缓存的超时时长。
  • wakeup:由调用KafkaConsumer对象的消费者线程之外的其他线程设置,表示要中断KafkaConsumer线程。
  • wakeupDisabledCount:KafkaConsumer是否正在执行不可中断的方法。每进入一个不可中断的方法时,则增加一,退出不可中断方法时,则减少一。
    wakeupDisabledCount只会被KafkaConsumer线程修改,其他线程不能修改。

ConsumerNetworkClient.poll()方法是ConsumerNetworkClient中最核心的方法,poll方法有多个重载,最终会调用poll(long timeout,long now,boolean executeDelayedTasks)重载,这三个参数的含义分别是:

  • timeout表示执行poll方法的最长阻塞时间(单位是ms),如果为0,则表示不阻塞;
  • now表示当前时间戳;
  • executeDelayedTasks表示是否执行delayedTasks队列中的定时任务。

下面介绍其流程,其中简单回顾一下NetworkClient的功能:

  1. 调用ConsumerNetworkClient.trySend方法循环处理unsent中缓存的请求。

    具体逻辑是:对每个Node节点,循环遍历其对应的ClientRequest列表,每次循环都调用NetworkClient.ready方法检测消费者与此节点之间的连接,以及发送请求的条件。

    若符合发送条件,则调用NetworkClient.send()方法将请求放入InFlightRequests队中等待响应,也放入KafkaChannel的send字段中等待发送,并将此消息从列表中删除。实现代码如下:
    在这里插入图片描述

  2. 计算超时时间,此超时时间由timeout与delayedTasks队列中最近要执行的定时任务的时间共同决定。在下面的NetworkClient.poll()方法中,会使用此超时时间作为最长阻塞时长,避免影响定时任务的执行。

  3. 调用NetworkClient.poll方法,将KafkaChannel.send字段指定的消息发送出去。除此之外,NetworkClient.poll()方法可能会更新Metadata使用一系列handle*方法处理请求响应、连接断开、超时等情况,并调用每个请求的回调函数。

  4. 调用ConsumerNetworkClient.maybeTriggerWakeup方法,检测wakeup和wakeupDisabledCount,查看是否有其他线程中断。如果有中断请求,则抛出WakeupException异常,中断当前ConsumerNetworkClient.poll方法。

在这里插入图片描述

  1. 调用checkDisconnects方法检测连接状态。检测消费者与每个Node之间的连接状态,当检测到连接断开的Node时,会将其在unsent集合中对应的全部ClientRequest对象清除掉,之后调用这些ClientRequest的回调函数。

在这里插入图片描述

  1. 根据executeDelayedTasks参数决定是否处理delayedTasks队列中超时的定时任务,如果需要执行delayedTasks队列中的定时任务,则调用delayedTasks.poll()方法。

  2. 再次调用trySend方法。在步骤3中调用了NetworkClient.poll方法,在其中可能已经将KafkaChannel.send字段上的请求发送出去了,也可能已经新建了与某些Node的网络连接,所以这里再次尝试调用trySend方法。

  3. 调用ConsumerNetworkClient.failExpiredRequests()处理unsent中超时请求。它会循环遍历整个unsent集合,检测每个ClientRequest是否超时,调用超时ClientRequest的回调函数,并将其从unsent集合中删除。

在这里插入图片描述
分析完poll方法的详细步骤之后,我们下面来看其实现代码:

在这里插入图片描述
pollNoWakeup方法是poll方法的变体,表示执行不可被中断的poll方法。

具体逻辑是:在执行poll方法之前,会调用disableWakeups方法将wakeupDisabledCount加一,然后调用poll方法。这样,即使其他线程请求中断,也不会被响应。

poll(future)是poll方法的另一个实现阻塞发送请求的功能,代码如下所示。

在这里插入图片描述
在ConsumerNetworkClient.send方法中,会将待发送的请求封装成ClientRequest,然后保存到unsent集合中等待发送,具体代码如下。

在这里插入图片描述

在这里需要重点关注的是KafkaConsumer中使用的回调对象—RequestFutureCompletionHandler,其继承关系如图所示。

在这里插入图片描述
从RequestFutureCompletionHandler的继承关系上我们可以知道,它不仅实现了RequestCompletionHandler,它还继承了RequestFuture类。RequestFuture是一个泛型类,其核心字段如下所示。

  • isDone:表示当前请求是否已经完成,不管正常完成还是出现异常,此字段都会被设置为true。
  • exception:记录导致请求异常完成的异常类,与value字段互斥。此字段非空则表示出现异常,反之则表示正常完成。
  • value:记录请求正常完成时收到的响应,与exception字段互斥。此字段非空表示正常完成,反之表示出现异常。
  • listeners:RequestFutureListener集合, 用来监听请求完成的情况。RequestFutureListener接口有onSuccess()和onFailure()两个方法,对应于请求正常完成和出现异常两种情况。

在RequestFuture中有两处典型设计模式的使用:一处是compose方法,使用了适配器模式;另一处是chain方法,使用了责任链模式。下面是compose方法的相关代码:

在这里插入图片描述
图展示了使用compose()方法进行适配后,回调时的调用过程,也可以认为是请求完成的事件传播流程。

当调用RequestFuture对象的complete()或raise()方法时,会调用RequestFutureListener的onSuccess()或onFailure()方法,然后调用RequestFutureAdapter<T,S>的对应方法,最终调用RequestFuture对象的对应方法。

在这里插入图片描述
RequestFuture.chain()方法的实现与compose()类似,也是通过RequestFutureListener在多个RequestFuture之间传递事件。下面是其具体代码:

在这里插入图片描述

RequestFuture提供了一系列检查请求完成情况的方法,以及管理listeners的方法,代码比较简单,不再赘述了。

介绍完RequestFutureCompleteHandler之后,回到ConsumerNetworkClient的分析上来。下面简单介绍ConsumerNetworkClient中几个常用的功能,代码比较简单,就不贴出来了:

  • awaitMetadataUpdate()方法:循环调用poll方法,直到Metadata版本号增加,实现阻塞等待Metadata更新完成。
  • awaitPendingRequests()方法:等待unsent和InFightRequests中的请求全部完成(正常收到响应或出现异常)。
  • put()方法:向unsent中添加请求。
  • schedule()方法:向delayedTasks队列中添加定时任务。
  • leastLoadedNode()方法:查找Kafka集群中负载最低的Node。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/411328.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

阿里云云原生助力安永创新驱动力实践探索

云原生正在成为新质生产力变革的核心要素和企业创新的数字基础设施。2023 年 12 月 1 日&#xff0c;由中国信通院举办的“2023 云原生产业大会”在北京召开。在大会“阿里云云原生”专场&#xff0c;安永科技咨询合伙人王祺分享了对云原生市场的总览及趋势洞见&#xff0c;及安…

React的合成事件

合成事件&#xff1a;通过事件委托&#xff0c;利用事件传播机制&#xff0c;当事件传播到document时&#xff0c;再进行分发到对应的组件&#xff0c;从而触发对应所绑定的事件&#xff0c;然后事件开始在组件树DOM中走捕获冒泡流程。 原生事件 —— > React事件 —— >…

小白准备蓝桥杯之旅(c/c++b组)

前言&#xff1a;省赛获奖比例高达百分之60,只要比一半的人努力&#xff0c;你就能大概率获奖。 寒假做的3件事 1.稳基础 熟练掌握基础语法部分&#xff0c;c比c多个stl库优势&#xff0c;c语言的同学需要会实现c中stl库部分 2.刷真题 大概比赛前30天&#xff0c;坚持每天做…

QQ文档删除了怎么恢复?记住这3个方法!

我们有时会因为误操作或者需要清理空间而删除一些不需要的文件&#xff0c;包括我们在QQ平台上接收的文档。但是&#xff0c;一旦我们删除了这些文档&#xff0c;如果没有备份的情况下就难以找回。 那么&#xff0c;如果我们在删除QQ文档后想要恢复它们&#xff0c;应该怎么做…

HarmonyOS —— buildMode 设置(对比 Android Build Varient)

前言 在安卓中 Build Variant 主要依赖模块&#xff08;module&#xff09;中 build.gradle 的 BuildType 和 ProductFlavor 提供的属性和方法&#xff0c;我们可以使用 Build Type 可以配置不同的构建方式、ProductFlavor 主要用来进行多渠道打包。 在鸿蒙中要做到同样像效果…

聚观早报 | OpenAI组建新团队;Nexperia推出新款全新产品

聚观早报每日整理最值得关注的行业重点事件&#xff0c;帮助大家及时了解最新行业动态&#xff0c;每日读报&#xff0c;就读聚观365资讯简报。 整理丨Cutie 1月18日消息 OpenAI组建新团队 Nexperia推出全新产品 苹果推出Vision Pro应用商店 华为nova 12 Pro心钥套装 蔚…

Baumer工业相机堡盟工业相机如何通过NEOAPI SDK使用相机日志跟踪功能(C++)

Baumer工业相机堡盟工业相机如何通过NEOAPI SDK使用相机日志跟踪功能&#xff08;C&#xff09; Baumer工业相机Baumer工业相机NEOAPI SDK和短曝光功能的技术背景Baumer工业相机通过NEOAPI SDK使用相机日志跟踪功能1.引用合适的类文件2.通过NEOAPI SDK使用相机日志跟踪功能3.通…

Verilog刷题笔记16

题目&#xff1a; Since digital circuits are composed of logic gates connected with wires, any circuit can be expressed as some combination of modules and assign statements. However, sometimes this is not the most convenient way to describe the circuit. Pro…

[Vue]从数据库中动态加载阿里巴巴矢量图标的两种方式

记录一次在Vue中动态使用阿里巴巴矢量图标库 这是本人第一次使用阿里巴巴的矢量图标库&#xff0c;简单的导入和使用的话网上的教程很多&#xff0c;这里不多赘述&#xff0c;本人的需求是从数据库中加载出来并且显示到页面上&#xff0c;接下来简述一下如何实现。 以下代码均是…

国产阿里的Copilot能提效30%吗?

国产阿里的Copilot能提效30%吗&#xff1f; Copilot简介 GitHub 和 OpenAI 共同打造的一款编程神器–Copilot&#xff0c; 这是一款立足于人工智能技术的编程助手。在此基础上&#xff0c;借助于 GitHub 庞大的代码库和来自全球的开源社区帮助&#xff0c;搭配 OpenAI 在自然…

el-dialog嵌套使用,只显示遮罩层的问题

直接上解决方法 <!-- 错误写法 --><el-dialog><el-dialog></el-dialog></el-dialog><!-- 正确写法 --><el-dialog></el-dialog><el-dialog></el-dialog>我是不建议嵌套使用的&#xff0c;平级也能调用&#xff0c…

vite和webpack的区别和作用

前言 Vite 和 Webpack 都是现代化的前端构建工具&#xff0c;它们可以帮助开发者优化前端项目的构建和性能。虽然它们的目标是相似的&#xff0c;但它们在设计和实现方面有许多不同之处。 一、Vite详解和作用 vite 是什么 vite —— 一个由 vue 作者尤雨溪开发的 web 开发工…