kafka消费者多线程开发

目录

前言

kafka consumer 设计原理

多线程的方案 

参考资料


前言

目前,计算机的硬件条件已经大大改善,即使是在普通的笔记本电脑上,多核都已经是标配了,更不用说专业的服务器了。如果跑在强劲服务器机器上的应用程序依然是单线程架构,那实在是有点暴殄天物了。不过,Kafka Java Consumer 就是单线程的设计,你是不是感到很惊讶。所以,探究它的多线程消费方案,就显得非常必要了。

kafka consumer 设计原理

 从 Kafka 0.10.1.0 版本开始,KafkaConsumer 就变为了双线程的设计,即用户主线程和心跳线程。

 所谓用户主线程,就是你启动 Consumer 应用程序 main 方法的那个线程,而新引入的心跳线程(Heartbeat Thread)只负责定期给对应的 Broker 机器发送心跳请求,以标识消费者应用的存活性(liveness)。引入这个心跳线程还有一个目的,那就是期望它能将心跳频率与主线程调用 KafkaConsumer.poll 方法的频率分开,从而解耦真实的消息处理逻辑与消费者组成员存活性管理。

 单线程的设计能够简化 Consumer 端的设计。Consumer 获取到消息后,处理消息的逻辑是否采用多线程,完全由你决定。这样,你就拥有了把消息处理的多线程管理策略从 Consumer 端代码中剥离的权利。

多线程的方案 

 我们要明确的是,KafkaConsumer 类不是线程安全的 (thread-safe)。所有的网络 I/O 处理都是发生在用户主线程中,因此,你在使用过程中必须要确保线程安全。简单来说,不能在多个线程中共享同一个 KafkaConsumer 实例,否则程序会抛出 ConcurrentModificationException 异常。

由于kafka consumer不是线程安全,我么你能制定两种多线程的方案。

1.消费者程序启动多个线程,每个线程维护专属的 KafkaConsumer 实例,负责完整的消息获取、消息处理流程。如下图所示:

 2.消费者程序使用单或多线程获取消息,同时创建多个消费线程执行消息处理逻辑。获取消息的线程可以是一个,也可以是多个,每个线程维护专属的 KafkaConsumer 实例,处理消息则交由特定的线程池来做,从而实现消息获取与消息处理的真正解耦。具体架构如下图所示:

我们来打个比方。比如一个完整的消费者应用程序要做的事情是 1、2、3、4、5,那么方案 1 的思路是粗粒度化的工作划分,也就是说方案 1 会创建多个线程,每个线程完整地执行 1、2、3、4、5,以实现并行处理的目标,它不会进一步分割具体的子任务;而方案 2 则更细粒度化,它会将 1、2 分割出来,用单线程(也可以是多线程)来做,对于 3、4、5,则用另外的多个线程来做。 

这两种方案的比较如下:

实现代码示例如下:

方案一的代码:

public class KafkaConsumerRunner implements Runnable {private final AtomicBoolean closed = new AtomicBoolean(false);private final KafkaConsumer consumer;public void run() {try {consumer.subscribe(Arrays.asList("topic"));while (!closed.get()) {ConsumerRecords records = consumer.poll(Duration.ofMillis(10000));//  执行消息处理逻辑}} catch (WakeupException e) {// Ignore exception if closingif (!closed.get()) throw e;} finally {consumer.close();}}// Shutdown hook which can be called from a separate threadpublic void shutdown() {closed.set(true);consumer.wakeup();}

这段代码创建了一个 Runnable 类,表示执行消费获取和消费处理的逻辑。每个 KafkaConsumerRunner 类都会创建一个专属的 KafkaConsumer 实例。在实际应用中,你可以创建多个 KafkaConsumerRunner 实例,并依次执行启动它们,以实现方案 1 的多线程架构

方案2 的代码:

private final KafkaConsumer<String, String> consumer;
private ExecutorService executors;
...private int workerNum = ...;
executors = new ThreadPoolExecutor(workerNum, workerNum, 0L, TimeUnit.MILLISECONDS,new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());...
while (true)  {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));for (final ConsumerRecord record : records) {executors.submit(new Worker(record));}
}
..

参考资料

20 | 多线程开发消费者实例-极客时间

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/116033.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

LeetCode 416.分割等和子集(动态规划【0-1背包问题】采用一维数组dp:滚动数组)

>>往期文章&#xff1a; 解决0-1背包问题&#xff08;方案一&#xff09;:二维dp数组_呵呵哒(&#xffe3;▽&#xffe3;)"的博客-CSDN博客 解决0-1背包问题&#xff08;方案二&#xff09;&#xff1a;一维dp数组&#xff08;滚动数组&#xff09;_呵呵哒(&…

LeetCode 1993. 树上的操作:大模拟

【LetMeFly】1993.树上的操作&#xff1a;大模拟 力扣题目链接&#xff1a;https://leetcode.cn/problems/operations-on-tree/ 给你一棵 n 个节点的树&#xff0c;编号从 0 到 n - 1 &#xff0c;以父节点数组 parent 的形式给出&#xff0c;其中 parent[i] 是第 i 个节点的…

【100天精通Python】Day68:Python可视化_Matplotlib 绘制热力图,示例+代码

目录 1 值热力图&#xff08;Value Heatmap&#xff09;: 2 密度热力图&#xff08;Density Heatmap&#xff09; 3 时间热力图&#xff08;Time Heatmap&#xff09;: 4 空间热力图&#xff08;Spatial Heatmap&#xff09; 5 渐变热力图&#xff08;Gradient Heatmap&am…

机器学习的主要内容

分类任务 回归任务 有一些算法只能解决回归问题有一些算法只能解决分类问题有一些算法的思路既能解决回归问题&#xff0c;又能解决分类问题 一些情况下&#xff0c; 回归任务可以转化为分类任务&#xff0c; 比如我们预测学生的成绩&#xff0c;然后根据学生的成绩划分为A类、…

JDK21新特性

JDK 21 于 2023 年 9 月 19 日正式发布。Oracle 提供GPL 下的生产就绪二进制文件&#xff1b;其他供应商的二进制文件也将很快推出。 Spring Boot 3.x 版本最低支持的 JDK 版本为 JDK 17&#xff0c;也就是说如果你还想用 JDK8的话&#xff0c;那能用的最高 Spring Boot 版本为…

代码随想录算法训练营 动态规划part17

一、回文子串 647. 回文子串 - 力扣&#xff08;LeetCode&#xff09; class Solution {public int countSubstrings(String s) {boolean[][] dp new boolean[s.length()][s.length()];int ans 0;for (int j 0; j < s.length(); j) {for (int i 0; i < j; i) {if …

linux安装sqoop

目录 一 解压安装包 二 修改配置文件 三 拷贝 jar 包 &#xff08;1&#xff09;sqoop147目录下补全 jar 包 &#xff08;2&#xff09;lib 目录下补全 jar 包 四 修改环境变量 五 查看 sqoop 版本以及测试连接 一 解压安装包 这里提供了网盘资源 链接: https://pan.ba…

三、初识FreeRTOS之FreeRTOS基础知识

从这节开始&#xff0c;我们正式学习FreeRTOS的一些基础知识&#xff0c;争取做到日更&#xff0c;或者隔日更。如果在学习的过程中哪里有理解错误&#xff0c;希望各位朋友批评指正。因为自己觉得图文并茂好像更容易理解一点&#xff0c;所以在博文中加了大量的图片&#xff0…

SpringMVC之自定义注解

目录 一.JAVA注解简介 1.1.Java注解分类 1.2.JDK元注解 二.自定义注解 1.1.如何自定义注解 1.2.自定义注解的基本案例 1.2.1.案例一&#xff08;获取类与方法上的注解值&#xff09; 1.2.2.案例二&#xff08;获取类属性上的注解属性值&#xff09; 1.2.3. 案例三&#xff…

Qt: 鼠标形状设置

设置全局鼠标形状 设置完毕后&#xff0c;整个APP的任何窗体&#xff0c;包括Dialog中的鼠标形状都会被修改为设定类型&#xff0c;某一个控件设定的鼠标形状将被替换。一般不建议使用 QCursor cursor;//创建鼠标对象 cursor.setShape(Qt::CursorShape::ClosedHandCursor);//…

【初阶数据结构】二叉树全面知识总结

二叉树详解 树的概念及其结构树的概念树的相关概念树的表示方法孩纸兄弟表示法双亲表示法&#xff08;并查集&#xff09; 树的实际应用 二叉树二叉树的概念二叉树的种类二叉树的性质二叉树的存储结构 二叉树顺序结构的实现堆的概念及结构堆向上、向下调整法堆的插入堆的删除堆…

Qt-day3

1、完成文本编辑器的保存工作 //保存按钮对应的槽函数 void Widget::on_saveBtn_clicked() {QString fileName QFileDialog::getSaveFileName(this, //父组件"保存文件", //对话框标题"./", //起始路径"All(*.…