kafka配置多个消费者groupid kafka多个消费者消费同一个partition(java)

目录

  • 1- 单播模式,只有一个消费者组
  • 2- 广播模式,多个消费者组
  • 3- Java实践

kafka是由Apache软件基金会开发的一个开源流处理平台。kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者在网站中的所有动作流数据。
kafka中partition类似数据库中的分表数据,可以起到水平扩展数据的目的,比如有a,b,c,d,e,f 6个数据,某个topic有两个partition,一般情况下partition-0存储a,c,e3个数据,partition-1存储b,d,f另外3个数据。

1- 单播模式,只有一个消费者组

topic只有1个partition,该组内有多个消费者时,此时同一个partition内的消息只能被该组中的一个consumer消费。当消费者数量多于partition数量时,多余的消费者是处于空闲状态的,如图1所示。topic,test只有一个partition,并且只有1个group,G1,该group内有多个consumer,只能被其中一个消费者消费,其他的处于空闲状态。

在这里插入图片描述
该topic有多个partition,该组内有多个消费者,比如test 有3个partition,该组内有2个消费者,那么可能就是C0对应消费p0,p1内的数据,c1对应消费p2的数据;如果有3个消费者,就是一个消费者对应消费一个partition内的数据了。图解分别如图2,图3.这种模式在集群模式下使用是非常普遍的,比如我们可以起3个服务,对应的topic设置3个partiition,这样就可以实现并行消费,大大提高处理消息的效率。

在这里插入图片描述
在这里插入图片描述

2- 广播模式,多个消费者组

如果想实现广播的模式就需要设置多个消费者组,这样当一个消费者组消费完这个消息后,丝毫不影响其他组内的消费者进行消费,这就是广播的概念。

多个消费者组,1个partition;
该topic内的数据被多个消费者组同时消费,当某个消费者组有多个消费者时也只能被一个消费者消费.

在这里插入图片描述

多个消费者组,多个partition

该topic内的数据可被多个消费者组多次消费,在一个消费者组内,每个消费者又可对应该topic内的一个或者多个partition并行消费,如图

在这里插入图片描述

3- Java实践

这里使用Java服务进行实践,模拟2个parition,然后同一个组内有2个消费者的情况:

首先创建一个发送消息的controller方法:

@ApiOperation(value = "向具有kafka-2个partition的topic发送信息")@RequestMapping(value = "/testSendMessage2", method = RequestMethod.POST)public String testSendMessage(@RequestParam("msg") String msg) {KafkaTemplate.send(KafkaTopicEnum.TEST_TWO_PARTITION_MSG.code,msg);System.out.println("发送的消息是:"+msg);return "2个partition的topic数据!--ok";}

然后再创建一个监听类监听该topic,这里的监听类即为消费者。

/*** @date 2020-09-24* 两个partition的topic,同一个组的两个消费者就可以并行的消费了,需要kafka也是集群才行,单机版并不支持* @param consumerRecord* @param acknowledgment*/@KafkaListener(topics = "two-partition-msg",groupId ="serverGroup1",containerFactory = "ackContainerFactory")public void receiveKafkaTwoParMsg(ConsumerRecord<?,?> consumerRecord, Acknowledgment acknowledgment){InetAddress address = null;try {address = InetAddress.getLocalHost();} catch (UnknownHostException e) {e.printStackTrace();}System.out.println("当前的IP地址是:"+address.getHostAddress());System.out.println("监听服务A-收到的消息是::");System.out.println(consumerRecord.value().toString());System.out.println("=================== end =================");
//        ack 提交掉,避免服务重启再次拉取到消息acknowledgment.acknowledge();}

然后我们给该服务起2个实例,即模拟该组内serverGroup1内的2个消费者,然后我们使用测试方法进行测试,向该topic内发送多个消息,观察2个实例的输出日志:

实例1:

发送的消息是:111
当前的IP地址是:10.244.3.114
监听服务A-收到的消息是::
“111=================== end =================
发送的消息是:222
发送的消息是:333
当前的IP地址是:10.244.3.114
监听服务A-收到的消息是::
“333=================== end =================
发送的消息是:444
发送的消息是:555
当前的IP地址是:10.244.3.114
监听服务A-收到的消息是::
“555=================== end =================
发送的消息是:666
发送的消息是:777
当前的IP地址是:10.244.3.114
监听服务A-收到的消息是::
“777=================== end =================
发送的消息是:888
发送的消息是:999
当前的IP地址是:10.244.3.114
监听服务A-收到的消息是::
“999-----------------------------------

实例2:

当前的IP地址是:10.244.0.237
监听服务A-收到的消息是::
“222=================== end =================
当前的IP地址是:10.244.0.237
监听服务A-收到的消息是::
“444=================== end =================
当前的IP地址是:10.244.0.237
监听服务A-收到的消息是::
“666=================== end =================
当前的IP地址是:10.244.0.237
监听服务A-收到的消息是::
“888”
发现该组内的一个消费者消费到了111,333,555,777,999 ,另外一个消费者消费到了222,444,666,888,起到了均衡消费的效果。

所以在微服务的集群中,我们可以通过给topic设置多个partition,然后让每一个实例对应消费1个partition的数据,从而实现并行的处理数据,可以显著地提高处理消息的速度。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/269768.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

bind、call、apply 区别?

作用 call、apply、bind作用是改变函数执行时的上下文&#xff0c;简而言之就是改变函数运行时的this指向 那么什么情况下需要改变this的指向呢&#xff1f; 下面举个例子 var name "lucy"; var obj {name: "martin",say: function () {console.log(…

深入理解 SVG:开启向量图形的大门(下)

&#x1f90d; 前端开发工程师&#xff08;主业&#xff09;、技术博主&#xff08;副业&#xff09;、已过CET6 &#x1f368; 阿珊和她的猫_CSDN个人主页 &#x1f560; 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 &#x1f35a; 蓝桥云课签约作者、已在蓝桥云…

nodejs+vue+微信小程序+python+PHP社区居民信息管理及数据分析系统-计算机毕业设计推荐django

社区居民信息管理及数据分析与可视化系统可以为社区领导提供业务管理功能&#xff0c;社区领导也就是系统的管理员&#xff0c;具有社区居民管理、流入人口管理、流出人口管理、社区信息管理、流出协同管理、公告管理的权限&#xff0c; 本文先充分调查社区居民信息管理及数据分…

计网 - LVS 是如何直接基于 IP 层进行负载平衡调度

文章目录 模型LVS的工作机制初探LVS的负载均衡机制初探 模型 大致来说&#xff0c;可以这么理解&#xff08;只是帮助我们理解&#xff0c;实际上肯定会有点出入&#xff09;&#xff0c;对于我们的 PC 机来说&#xff0c;物理层可以看成网卡&#xff0c;数据链路层可以看成网卡…

【ARM Trace32(劳特巴赫) 使用介绍 13 -- Trace32 断点 Break 命令篇】

文章目录 1. Break.Set1.1 TRACE32 Break1.1.1 Break命令控制CPU的暂停1.2 Break.Set 设置断点1.2.1 Trace32 程序断点1.2.2 读写断点1.2.2.1 变量被改写为特定值触发halt1.2.2.2 设定非值触发halt1.2.2.4 变量被特定函数改写触发halt1.2.3 使用C/C++语法设置断点条件1.2.4 使用…

机器学习---Boosting

1. Boosting算法 Boosting思想源于三个臭皮匠&#xff0c;胜过诸葛亮。找到许多粗略的经验法则比找到一个单一的、高度预 测的规则要容易得多&#xff0c;也更有效。 预测明天是晴是雨&#xff1f;传统观念&#xff1a;依赖于专家系统&#xff08;A perfect Expert) 以“人无…

【SpringBoot篇】详解基于Redis实现短信登录的操作

文章目录 &#x1f970;前言&#x1f6f8;StringRedisTemplate&#x1f339;使用StringRedisTemplate⭐常用的方法 &#x1f6f8;为什么我们要使用Redis代替Session进行登录操作&#x1f386;具体使用✨编写拦截器✨配置拦截器&#x1f33a;基于Redis实现发送手机验证码操作&am…

matlab 最小二乘拟合平面(拉格朗日乘子法)

目录 一、算法原理二、代码实现三、结果展示本文由CSDN点云侠原创,原文链接。博客长期更新,爬虫自重。 一、算法原理 设拟合出的平面方程为: a x + b y &#

如何利用宝塔面板和docker快速部署网站

当你有了一台服务器&#xff0c;就会折腾往这台服务器上部署各种好玩的网站。市面上有许多开源的网站项目&#xff0c;通过docker技术可以快速部署并使用&#xff0c;本文将以部署filebrowser举例介绍网站部署的基本流程。 1. 安装宝塔面板 宝塔面板是一款开源的网站运维工具…

5. PyTorch——数据处理模块

1.数据加载 在PyTorch中&#xff0c;数据加载可通过自定义的数据集对象。数据集对象被抽象为Dataset类&#xff0c;实现自定义的数据集需要继承Dataset&#xff0c;并实现两个Python魔法方法&#xff1a; __getitem__&#xff1a;返回一条数据&#xff0c;或一个样本。obj[in…

IEEE Transactions on Industrial Electronics工业电子TIE论文投稿须知

一、背景 IEEE TIE作为控制领域的TOP期刊&#xff0c;接收机器人、控制、自动驾驶、仪器和传感等方面的论文&#xff0c;当然范围不止这些&#xff0c;感兴趣的可以自行登录TIE官网查看。所投稿论文必须经过实验验证&#xff0c;偏工程应用类&#xff0c;当然也必须有方法上的…

RocketMQ-RocketMQ常见问题与总结

一、RocketMQ如何保证消息不丢失 ​ 这个是在面试时&#xff0c;关于MQ&#xff0c;面试官最喜欢问的问题。这个问题是所有MQ都需要面对的一个共性问题。大致的解决思路都是一致的&#xff0c;但是针对不同的MQ产品又有不同的解决方案。分析这个问题要从以下几个角度入手&…