Kafka-消费者-KafkaConsumer分析

与KafkaProducer不同的是,KafkaConsumer不是一个线程安全的类。

为了便于分析,我们认为下面介绍的所有操作都是在同一线程中完成的,所以不需要考虑锁的问题。

这种设计将实现多线程处理消息的逻辑转移到了调用KafkaConsumer的代码中,可以根据业务逻辑使用不同的实现方式。

例如,可以使用“线程封闭”的方式,每个业务线程拥有一个KafkaConsumer对象,这种方式实现简单、快速。

还可以使用两个线程池实现“生产者—消费者”模式,解耦消息消费和消息处理的逻辑。

其中一个线程池中每个线程拥有一个KafkaConsumer对象,负责从Kafka集群拉取消息,然后将消息放入队列中缓存,而另一个线程池中的线程负责从队列中获取消息,执行处理消息的业务逻辑。

下面开始对KafkaConsumer的分析。

KafkaConsumer实现了Consumer接口,Consumer接口中定义了KafkaConsumer对外的API,其核心方法可以分为下面六类。

  • subscribe()方法:订阅指定的Topic,并为消费者自动分配分区。
  • assign()方法:用户手动订阅指定的Topic,并且指定消费的分区。此方法与subscribe()方法互斥。
  • commit*()方法:提交消费者已经消费完成的offset。
  • seek*()方法:指定消费者起始消费的位置。
  • poll()方法:负责从服务端获取消息。
  • pause()、resume()方法:暂停/继续Consumer,暂停后poll方法会返回空。

了解了Consumer接口定义的功能之后,我们下面就来分析KafkaConsumer的具体实现。首先,我们需要了解KafkaConsumer中重要的字段,如图所示。

在这里插入图片描述

  • PRODUCER_CLIENT_ID_SEQUENCE:clientld的生成器,如果没有明确指定client的Id,则使用字段生成一个ID。
  • clientld:Consumer的唯一标示。
  • coordinator:控制着Consumer与服务端GroupCoordinator之间的通信逻辑,可以将其理解成Consumer与服务端GroupCoordinator通信的门面。
  • keyDeserializer和valueDeserializer:key反序列化器和value反序列化器。
  • fetcher:负责从服务端获取消息。
  • interceptors:Consumerlnterceptor集合,ConsumerInterceptor.onConsumer()方法可以在消息通过poll()方法返回给用户之前对其进行拦截或修改;ConsumerInterceptor.onCommit()方法也可以在服务端返回提交offset成功的响应时对其进行拦截或修改。
  • client:负责消费者与Kafka服务端的网络通信。
  • subscriptions:维护了消费者的消费状态。
  • metadata:记录了整个Kafka集群的元信息。
  • currentThread和refcount:分别记录了当前使用KafkaConsumer的线程Id和重入次数,KafkaConsumer的acquire()方法和release()方法实现了一个“轻量级锁”,它并非真正的锁,仅是检测是否有多线程并发操作KafkaConsumer而已。

在后面的分析过程中,我们会逐个分析KafkaConsumer依赖的组件的功能和实现。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/411281.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于nodejs+vue+uniapp社区居民健康问诊管理系统 微信小程序

系统实现阶段的根本目标在这个阶段的设计工作中精确地描述出目标系统,从而在编码阶段可以直接根据这个描述翻译成用程序语言编写的系统。 系统实现的基本任务有以下几个: 语言:javapythonnodejsphpnodejs均可选 运行软件:idea/eclipse/vsc…

Three.js 学习笔记之模型(学习中1.17更新)

文章目录 模型 几何体 材质模型点模型Points - 用于显示点线模型Line | LineLoop | LineSegments网格模型mesh - 三角形 几何体BufferGeometry缓冲类型几何体BufferGeometry - 没有任何形状的空几何体创建几何体的方式BufferAttribute Types定义顶点法线 geometry.attributes…

Kafka系列(四)

本文接kafka三,代码实践kafkaStream的应用,用来完成流式计算。 kafkastream 关于流式计算也就是实时处理,无时间概念边界的处理一些数据。想要更有性价比地和java程序进行结合,因此了解了kafka。但是本人阅读了kafka地官网&#…

Revealing the Dark Secrets of MIM

论文名称: Revealing the Dark Secrets of Masked Image Modeling 发表时间:CVPR2022 作者及组织:Zhenda Xie, Zigang Geng, Hu Han等,来自清华,中科院,微软亚洲研究院。 前言 本文尝试探讨MIM为何有效的原…

Java SE入门及基础(19)

二维数组 1. 数组的本质 数组从本质上来说只有一维,二维数组是指在一维数组中再放入一个一维数组。三维数组、四维数组依次类推。 2. 二维数组的定义 语法 数据类型[][] 数组名 new 数据类型[数组的长度][数组的长度]; 示例 public static void main ( Str…

Skywalking链路追踪

目录 一、简介1.1、APM系统1.2、SkyWalking 简介 二、快速入门2.1、下载、启动2.2、界面认识 三、持久化存储四、告警通知五、自定义追踪-细粒度追踪service方法 一、简介 1.1、APM系统 APM(Application Performance Monitoring)系统是一种用于监控和管…

FindMy技术与相机结合

FindMy是苹果公司提供的设备追踪服务,用来帮助用户定位丢失的设备。自苹果公司开放Findmy网络之后,FindMy技术便与各种生活设备相结合,比如与相机的结合。 想象一下,你正在外出办事或者旅行时,突然意识到相机丢了&…

MySQL中根据出生日期计算年龄

创建student表 mysql> create table student( -> sid int primary key comment 学生号, -> sname varchar(20) comm…

Android-常用数据结构和控件

HashMap 的原理 HashMap 的内部可以看做数组链表的复合结构。数组被分为一个个的桶(bucket)。哈希值决定了键值对在数组中的寻址。具有相同哈希值的键值对会组成链表。需要注意的是当链表长度超过阈值(默认是8)的时候会触发树化,链表会变成树形结构。 把握HashMap的…

Pytorch各种Dropout层应用于详解

目录 torch框架Dropout functions详解 dropout 用途 用法 使用技巧 参数 数学理论公式 代码示例 alpha_dropout 用途 用法 使用技巧 参数 数学理论公式 代码示例 feature_alpha_dropout 用途 用法 使用技巧 参数 数学理论 代码示例 dropout1d 用途 用…

注解实现校验接口传参是否超出取值范围

文章目录 1、定义注解2、使用注解3、其余校验实现思路2.04、其余校验实现思路3.0 写接口,Dto里很多字段要检验传参范围,自定义个注解来校验。 1、定义注解 注解定义代码: import javax.validation.Constraint; import javax.validation.Con…

基于python+uniapp的网上订餐系统的设计与实现 微信小程序

考虑到实际生活中在晓海网上订餐方面的需要以及对该系统认真的分析,将系统权限按管理员和用户这两类涉及用户划分。 (1)用户功能需求 用户进入APP可以进行首页、系统公告、在线投诉、我的等操作,在我的页面可以对菜品评价、订单信…