RocketMQ生产者消息发送出去了,消费者一直接收不到怎么办?(Rocket MQ订阅关系一致性)

问题: 使用RocketMQ消息队列,生产者将数据发送出去了,但是生产者一致没接收到(或者是间隔好几分钟,突然接收到一条数据)怎么办?并且通过rocket web控制台查看消息的状态为NOT_ONELINE或者NOT_CONSUME,(如下图) 这种诡异现象该怎么解决?
在这里插入图片描述

1. 先说解决方案

这种情况99%是由于订阅关系不一致导致的,可以排查下程序看看是否有多个消费者使用了同一个group,并且订阅了不同的主题。逻辑图展示如下:
在这里插入图片描述
这种情况只需要将不同的消费者的group区分一下即可, 逻辑关系图变成如下这种:
在这里插入图片描述

到此为止,是不是惊奇的发现,问题解决了?

2. 注意事项:订阅关系一致性

看下Rocket MQ官方文档给出的说明:

定义
消费者分组是 Apache RocketMQ 系统中承载多个消费行为一致的消费者的负载均衡分组。

和消费者不同,消费者分组并不是运行实体,而是一个逻辑资源。在 Apache RocketMQ 中,通过消费者分组内初始化多个消费者实现消费性能的水平扩展以及高可用容灾。

在这里插入图片描述
这里面只描述出了Tag的一致,事实上下面这种订阅关系也是错误的,同一个group中的两个消费者分别订阅了不同的主题, 违背了定义中的消费行为一致原则:

//Consumer c1
Consumer c1 = ConsumerBuilder.build(groupA);
c1.subscribe(topicA);
//Consumer c2Consumer 
c2 = ConsumerBuilder.build(groupA);
c2.subscribe(topicB);

3. 剖析源码实现,分析原因

从GitHub下载rocketmq源码通过idea打开之后,从官方提供的example进来:
在这里插入图片描述
进入到DefaultMQPushConsumer构造方法中,可以发现初始化了一个DefaultMQPushConsumerImpl类:

    public DefaultMQPushConsumer(final String namespace, final String consumerGroup, RPCHook rpcHook,AllocateMessageQueueStrategy allocateMessageQueueStrategy) {this.consumerGroup = consumerGroup;this.namespace = namespace;this.allocateMessageQueueStrategy = allocateMessageQueueStrategy;// 这里初始化一个默认的push类型的Consumer实现类defaultMQPushConsumerImpl = new DefaultMQPushConsumerImpl(this, rpcHook);}

然后继续进入到DefaultMQPushConsumerImpl类中, 可以看见有一个成员变量MQClientInstance mQClientFactory, 在DefaultMQPushConsumerImpl类的start()(启动消费者)方法中会通过MQClientManager初始化MQClientInstance类.
在这里插入图片描述
接着跳转到MQClientInstance构造方法中, 会发现有这样一行代码, 初始化了一个rebalanceService. 这个rebalanceService就是RocketMQ隔一段时间进行rebalance的核心实现.
在这里插入图片描述
继续剖析RebalanceService类, 发现其实现了Runnable接口, 话不多说, 直接看其 run()方法中做了什么事.

呀! 原来是隔一段时间调用一次上述咱们提到的DefaultMQPushConsumerImpl类中的doRebalance()方法, 搞了半天又绕回来了. … … … … … …

直接进入到这里面, 看看rebalance的逻辑:
在这里插入图片描述
集群部署模式下, 会进行rebalance操作, 根据topic名称和group名称获取到所有的consumer列表.

case CLUSTERING: {Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);// 这里根据topic名称和Group进行获取到所有的consumerList<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);if (null == mqSet) {if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {this.messageQueueChanged(topic, Collections.<MessageQueue>emptySet(), Collections.<MessageQueue>emptySet());log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);}}

但是进去这行代码里面发现, topic名称仅仅用来获取Broker的网络地址, 真正获取到所有Consumer列表的是通过Group名称获取的, 看到这里相信大家基本上能够恍然大悟. 回归到上面的问题: 如果一个一个Group中的多个消费者分别订阅了不同的主题, 即: 消费行为不一致, 无论这个属于当前Group中的消费者是否订阅了这个主题, 都会参与rebalance.
在这里插入图片描述
画图解释一下, 假设在同一个Group下, 两个Consumer都分别订阅了Topic1和Topic2, 这种情况订阅关系一致,
在这里插入图片描述
假设消费者1消费Topic2的速度比较快, 经过一次rebalance之后, Consumer订阅的队列逻辑有可能成为这样的:
在这里插入图片描述
此时由于订阅关系的一致性, 整体系统并不会出现问题. 接下来看一种情况, 同一个消费组中的Consumer1 订阅了Topic1, Consumer2订阅了Topic2, 初始情况逻辑关系是这样:
在这里插入图片描述
由于进行rebalance是通过Group获取对应的消费者客户端ID, 因此rebalance之后可能出现Consumer1 指向了Topic2中的某一个队列, 同理, Consumer2指向了Topic1中的队列. 但是这与Consumer中设定的topic不一致, 因此会出现RocketMQ中消息状态为为NOT_COMSUME_YET

(个人通过对源码的简单梳理总结的文章, 如有错误欢迎指正)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/155751.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python爬取读书网的图片链接和书名并保存在数据库中

一个比较基础且常见的爬虫&#xff0c;写下来用于记录和巩固相关知识。 一、前置条件 本项目采用scrapy框架进行爬取&#xff0c;需要提前安装 pip install scrapy# 国内镜像 pip install scrapy -i https://pypi.douban.com/simple 由于需要保存数据到数据库&#xff0c;因…

[LeetCode]-27. 移除元素-26.删除有序数组中的重复项-88.合并两个有序数组

目录 27.移除元素 题目 思路 代码 26. 删除有序数组中的重复项 题目 思路 代码 88.合并两个有序数组 题目 思路 代码 总结 27.移除元素 27. 移除元素 - 力扣&#xff08;LeetCode&#xff09;https://leetcode.cn/problems/remove-element/description/ 题目 给你一…

843. n-皇后问题

文章目录 QuestionIdeasCode Question n− 皇后问题是指将 n 个皇后放在 nn 的国际象棋棋盘上&#xff0c;使得皇后不能相互攻击到&#xff0c;即任意两个皇后都不能处于同一行、同一列或同一斜线上。 现在给定整数 n &#xff0c;请你输出所有的满足条件的棋子摆法。 输入格…

多路IO—POll函数,epoll服务器开发流程

引言 "在计算机网络编程中&#xff0c;多路IO技术是非常常见的一种技术。其中&#xff0c;Poll函数和Epoll函数是最为常用的两种多路IO技术。这两种技术可以帮助服务器端处理多个客户端的并发请求&#xff0c;提高了服务器的性能。本文将介绍Poll和Epoll函数的使用方法&am…

JavaScript基础知识19——循环结构:while循环

哈喽&#xff0c;你好&#xff0c;我是雷工。 本节学习JavaScript基础语法的循环结构&#xff1a;while循环&#xff0c;以下为学习笔记。 while循环 循环概念&#xff1a;重复执行一些操作&#xff1b; 循环特征&#xff1a;不断地重复&#xff1b; while&#xff1a;在…期间…

NEFU数字图像处理(三)图像分割

一、图像分割的基本概念 1.1专有名词 前景和背景 在图像分割中&#xff0c;我们通常需要将图像分为前景和背景两个部分。前景是指图像中我们感兴趣、要分割出来的部分&#xff0c;背景是指和前景不相关的部分。例如&#xff0c;对于一张人物照片&#xff0c;人物就是前景&…

浅析Redis大Key | 京东云技术团队

一、背景 在京东到家购物车系统中&#xff0c;用户基于门店能够对商品进行加车操作。用户与门店商品使用Redis的Hash类型存储&#xff0c;如下代码块所示。不知细心的你有没有发现&#xff0c;如果单门店加车商品过多&#xff0c;或者门店过多时&#xff0c;此Key就会越来越大…

三、 链表

一、链表的定义 链表是一种动态数据结果&#xff0c;内存分配不是在创建链表时一次性完成的&#xff0c;每添加一个节点&#xff0c;分配一次内存&#xff0c;由于没有闲置的内存&#xff0c;链表的空间效率高于数组 二、定义单向链表 struct ListNode {int m_nValue;ListNo…

架构设计之大数据架构(Lambda架构、Kappa架构)

大数据架构 一. 大数据技术生态二. 大数据分层架构三. Lambda架构3.1 Lambda架构分解为三层3.2 优缺点3.3 实际案例 四. Kappa架构4.1 结构图4.2 优缺点4.3 实际案例 五. Lambda架构与Kappa架构对比 其它相关推荐&#xff1a; 系统架构之微服务架构 系统架构设计之微内核架构 鸿…

CentOS停更沉寂,RHEL巨变限制源代:Docker容器化技术的兴起助力操作系统新格局

一、概述 操作系统是计算机系统的核心软件&#xff0c;它管理和控制着计算机的硬件和软件资源&#xff0c;为用户和应用程序提供了一个统一、高效、安全的运行环境。操作系统的发展历史也是计算机技术的发展历史的重要组成部分&#xff0c;它见证了计算机从单机到网络&#xf…

宝塔面板安装Python和Flask(新版Python项目)

&#xff08;一&#xff09;宝塔面板的项目菜单&#xff0c;打开Python项目的“项目版本管理” 安装Python版本3.10.0。 会创建一个Python版本的文件夹www/server/pyproject_evn/versions/ 会创建一个Python虚拟环境的文件夹www/server/pyproject_evn/python_venv/ &#xf…

C#__委托delegate

委托存储的是函数的引用&#xff08;把某个函数赋值给一个委托类型的变量&#xff0c;这样的话这个变量就可以当成这个函数来进行使用了&#xff09; 委托类型跟整型类型、浮点型类型一样&#xff0c;也是一种类型&#xff0c;是一种存储函数引用的类型 using System.Reflec…