【Java核心能力】RocketMQ 针对消息有序和消息积压的处理

欢迎关注公众号(通过文章导读关注:【11来了】),及时收到 AI 前沿项目工具及新技术的推送!

在我后台回复 「资料」 可领取编程高频电子书
在我后台回复「面试」可领取硬核面试笔记

文章导读地址:点击查看文章导读!

感谢你的关注!

RocketMQ 如何保证消息有序?

RocketMQ 保证消息的有序性分为了两种:

  • 全局有序: 适用于并发度不大,并且对消息要求严格一致性的场景下

    通过创建一个 topic,并且该 topic 下只有一个队列,那么生产者向着一个队列中发消息,消费者也在这一个队列中消费消息,来保证消息的有序性

  • 局部有序: 适用于对性能要求比较高的场景,在设计层面将需要保证有序的消息放在 Topic 下的同一个队列即可保证有序

那么一般情况下,我们只需要保证局部有序即可,那么为了保证局部有序,可以在发送消息时,指定一个 MessageSelector 对象,来指定消息发送到哪一个 Message Queue 中去,将需要保证有序的消息发送到同一个 Message Queue 来保证消息的局部有序性

这里说一下如何保证消息的局部有序:

将需要保证有序的消息放在 Topic 下的同一个 Message Queue 即可,如下图:

1702709559999

代码如下,在发送消息的时候,指定 MessageSelector 对象 来将需要保证有序的消息发送到同一个队列中去即可:

/*** 这里发送消息的时候,根据 orderId 来选择对应发送的队列*/
producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {int orderId = (int)arg;int idx = orderId % mqs.size();return mqs.get(idx);}
}, order.orderId);

上边在 发送消息时保证了消息的有序性 ,那么在 消费消息 时也需要保证消息的有序消费,RocketMQ 的 MessageListener 回调函数提供了两种消费模式:

  • 有序消费: MessageListenerOrderly
  • 并发消费: MessageListenerConcurrently

为了保证有序消费,需要保证消费者注册 MessageListenerOrderly 的回调函数,来实现 顺序消费

上边两种消费方式都是使用线程池去消费消息,只不过在 MessageListenerOrderly 通过分布式锁和本地锁来保证同时只有一条线程去队列中消费数据,以此来保证顺序消费

但是使用了 MessageListenerOrderly 顺序消费会导致 两个问题:

  • 使用了锁,导致吞吐量下降
  • 前一个消息阻塞时,会导致后边消息都被阻塞。因此如果消息消费失败,要设置好最大重试

RocketMQ 消息积压如何处理?

事发时处理

RocketMQ 发生了消息积压, 事发时 一般有两种处理方式:

  • 增加消费者的数量: 如果 Topic 下的 Message Queue 有很多,可以通过 增加消费者的数量 来处理消息积压,如果 Topic 下的 Message Queue 有很多,那么每个消费者是会分配一个或多个 Message Queue 进行消费的,那么此时就可以通过增加消费者的数量,来加快该 Topic 中消息的消费速度
  • 新建 Topic 进行消息迁移: 如果 Topic 下的 Message Queue 很少, 那么此时增加消费者的数量也没有用了,可以临时 新创建一个 Topic ,并且将该 Topic 的 Message Queue 设置多一点,再新创建一组消费者将原 Topic 中的消息转发到新 Topic 中,此时就可以对新 Topic 采用增加消费者数量的方式来处理消息积压了

如何增加消费者的数量:

增加消费者的数量的话,可以通过 增加机器 或者在已有的机器上 启动新的进程 来实现

这里增加消费者的数量是有依据的,比如一个 Topic 下有 8 个 MessageQueue,那么最多将消费者数量增加到 8 个,因为 Topic 下一个队列只可以被同一消费者组的一个消费者消费,如果消费者的数量比 Topic 下的队列数量多的话,会有部分消费者分不到队列,因此消费者数量最多和 Topic 下的队列数量相同

上边说了增加消费者的数量来处理消息积压,还可以通过 提高单个消费者的消费能力 ,来尽快处理消息,避免消息积压

如何提高单个消费者的并发线程数:

提高单个消费者的消费并发线程,在 5.x 之前可以通过修改 DefaultMQPushConsumer 类中的 consumeThreadMinconsumeThreadMax 来提高单个消费者的并发能力(调整消费者的线程池的线程数量),在 5.x 版本可以通过PushConsumerBuilder.setConsumptionThreadCount() 设置线程数,SimpleConsumer可以由业务线程自由增加并发,底层线程安全

针对消息积压问题,提前预防

提前预防的话,主要可以从以下几个方面来考虑:

  • 生产者

对于生产者,可以进行限流,并且评估 Topic 峰值流量,合理设计 Topic 下的队列数量,添加异常监控,及时发现

  • 存储端

可以将次要消息转移

  • 消费者

对于消费者来说,可以进行 降级 处理:将消息先落库,再去异步处理

并且要合理地根据 Topic 的队列数量和应用性能来部署响应的消费者机器数量

  • 上线前

在上线前,采用灰度发布,先灰度小范围用户进行使用,没问题之后,再全量发布

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/570993.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

linux 网卡配置 vlan/bond/bridge/macvlan/ipvlan 模式

linux 网卡模式 linux网卡支持非vlan模式、vlan模式、bond模式、bridge模式&#xff0c;macvlan模式、ipvlan模式等&#xff0c;下面介绍交换机端及服务器端配置示例。 前置要求&#xff1a; 准备一台物理交换机&#xff0c;以 H3C S5130 三层交换机为例准备一台物理服务器&…

5个免费的3D钣金CAD软件

如果你正在设计简单的折叠钣金零件&#xff0c;则只需设计一些具有圆角半径的法兰&#xff1a;一个简单的钣金模块。 首先&#xff0c;你可以采用老式方式绘图并以 2D 方式完成所有操作。 许多传统制造商仍在使用 2D DWG 和 DXF 图纸。 因此&#xff0c;你很有可能只需快速起草…

工具推荐-eNSP(Enterprise Network Simulation Platform)

一.简介 1.1 一款由华为提供的免费的、可扩展的、图形化操作的网络仿真工具平台。 1.2 主要对企业网络路由器、交换机进行软件仿真&#xff0c;完美呈现真实设备实景&#xff0c;支持大型网络模拟。 1.3 让广大用户有机会在没有真实设备的情况下能够模拟,进行模拟网络架构和建…

HTTP 请求走私实现以及攻击案例

HTTP 请求走私实现以及攻击案例。 HTTP请求走私(HTTP Request Smuggling)是一种Web安全漏洞,它涉及到HTTP协议的不安全实现,特别是在处理多个HTTP请求时。这种漏洞可以被利用在多种场景中,导致不同的安全问题。以下是一些主要的漏洞和应用场景: 1. 缓存投毒(Cache Pois…

鸿蒙操作系统-初识

HarmonyOS-初识 简述安装配置hello world1.创建项目2.目录解释3.构建页面4.真机运行 应用程序包共享包HARHSP 快速修复包 官方文档请参考&#xff1a;HarmonyOS 简述 1.定义&#xff1a;HarmonyOS是分布式操作系统&#xff0c;它旨在为不同类型的智能设备提供统一的操作系统&a…

Python 全栈体系【四阶】(二十)

第五章 深度学习 二、推荐系统 1. 推荐算法介绍 1.1 个性化推荐算法 人口属性 地理属性 资产属性 兴趣属性 1.2 推荐算法分支 协同过滤推荐算法基于内容的推荐算法混合推荐算法流行度推荐算法 1.3 推荐算法 为推荐系统选择正确的推荐算法是非常重要的决定。目前为止…

【Python】搭建 Python 环境

目 录 一.安装 Python二.安装 PyCharm 要想能够进行 Python 开发&#xff0c;就需要搭建好 Python 的环境 需要安装的环境主要是两个部分&#xff1a; 运行环境: Python开发环境: PyCharm 一.安装 Python (1) 找到官方网站 (2) 找到下载页面 选择 “Download for Windows”…

【linux深入剖析】基础IO操作 | 使用Linux库函数实现读写操作 | 文件相关系统调用接口

&#x1f341;你好&#xff0c;我是 RO-BERRY &#x1f4d7; 致力于C、C、数据结构、TCP/IP、数据库等等一系列知识 &#x1f384;感谢你的陪伴与支持 &#xff0c;故事既有了开头&#xff0c;就要画上一个完美的句号&#xff0c;让我们一起加油 目录 前言1.复习C文件IO相关操…

机器学习K-means算法

K-Means 算法&#xff08;K-Means算法、K-Means 中心值计算、K-Means 距离计算公式、K-Means 算法迭代步骤、K-Means算法实例&#xff09; 问题引入 给你如下两种图片&#xff0c;快读回答2个问题&#xff0c;问 图1 中有几类五谷杂粮&#xff1f;问 图2 中有几类五谷杂粮&…

git 常用操作记录(tag、remote、rebase等)

关于git的常用命令&#xff08;add、commit、pull、push、merge、stash等&#xff09;在之前的博文已经介绍过了&#xff0c;下面根据工作中遇到的问题&#xff0c;总结一些更为常用的命令使用方式。 1、版本标签tag tag是基于一次commit的&#xff0c;可以指定在某个分支的提…

第十一届蓝桥杯大赛第二场省赛试题 CC++ 研究生组-寻找2020

数据很恶心&#xff0c;但是考点挺友好~ 把测试数据黏贴到记事本中&#xff0c;知测试数据的行列数 然后根据规则判断2020是否出现&#xff0c;并累计其次数即可。 判断可能需要注意超出下标&#xff0c;可以索性把数组定大些。 #include<stdio.h> const int N 310; ch…

2024年中国集成电路产业链上中下游市场分析(附产业链图谱)

在产业数字化转型的大背景下&#xff0c;受益于智能手机、智能汽车等终端应用蓬勃发展与全球半导体产业链产能转移&#xff0c;我国集成电路产业规模持续增长。集成电路作为重要的半导体器件&#xff0c;是典型的知识密集型、技术密集型、资本密集和人才密集型的高科技产业&…