前言
发送消息的逻辑在发给mq后消息链路就直接结束了。那么消息在mq收到后我们该如何保证后面消息一定能存储呢?执行业务逻辑出错了怎么办呢?当然这期只是IM特定场景下的一致性哈,不是分布式事务哈,有点小区别?使用MQ做分布式事务后面肯定也会发文章。
目前已经写的文章有。并且有对应视频版本。
git项目地址 【IM即时通信系统(企聊聊)】点击可跳转
sprinboot单体项目升级成springcloud项目 【第一期】
前端项目技术选型以及页面展示【第二期】
分布式权限 shiro + jwt + redis【第三期】
给为服务添加运维模块 统一管理【第四期】
微服务数据库模块【第五期】
netty与mq在项目中的使用(第六期(废弃))】
分布式websocket即时通信(IM)系统构建指南【第七期】
分布式websocket即时通信(IM)系统保证消息可靠性【第八期】
分布式websocket IM聊天系统相关问题问答【第九期】
什么?websocket也有权限!这个应该怎么做?【第十期】
分布式ID是什么,以美团Leaf为例改造融入自己项目【第十一期】
IM聊天系统为什么需要做消息幂等?如何使用Redis以及Lua脚本做消息幂等【第12期】
微信发送一条消息经历哪些过程。企业微信以及钉钉的IM架构对比【第13期】
微信群为什么上限是500人,IM设计系统中的群聊的设计难点【第14期】
B站上面关注我呐 B站和CSDN同名,1000粉丝后建群。然后B站关注我后可以私信CSDN来的,然后后面我建群的时候拉你!
问题回顾
简单的说问题就是IM发送一条消息投递到mq里面,然后mq如何能保证这条消息落库。确保消息一定能落库涉及到消息的可靠生产和消费。我们这篇文章先分析一下技术方案,然后进行选择,一步一步来。 以下是一些可以选择的步骤和策略:
-
- 可靠生产
同步发送:在发送消息时,使用 RocketMQ 的同步发送方式,这样生产者会等待消息服务器的响应,确保消息已经被成功接收。
消息发送重试:对于发送失败的消息,可以在生产者端进行重试。RocketMQ 客户端自带重试机制,但你也可以根据业务需求自定义重试策略。
事务消息:如果消息发送是业务操作的一部分,可以使用 RocketMQ 的事务消息来保证本地事务和消息发送的原子性。这样,只有当本地事务成功提交时,消息才会被真正发送出去。
- 可靠生产
-
- 可靠消费
消费状态确认:在消费者处理完消息并成功将其落库后,必须正确地返回消费成功的状态。这样,RocketMQ 不会再次投递这条消息。
幂等性处理:为了防止消息被重复消费(例如,在消息重试的情况下),消费者在处理消息时需要实现幂等性。这通常通过在数据库中记录每条消息的唯一标识,并在处理前检查该标识是否已存在来实现。
死信队列处理:对于无法成功处理的消息(如因为业务逻辑错误或系统异常),应该将其转移到死信队列。然后可以通过人工干预或自动化脚本处理这些消息,确保它们最终被正确处理。
- 可靠消费
-
- 消息监控和告警
监控:利用 RocketMQ 提供的监控工具,如 RocketMQ Dashboard,监控消息的生产和消费状态,及时发现问题。
告警:设置合理的告警规则,当发现消息堆积、消费延迟等异常情况时,及时通知相关人员进行处理。
- 消息监控和告警
选择技术方案
从上面可靠生产的方案选择异步发送重试策略。同步对系统性能影响大了。所以选择异步加重试。
采用如下流程
- 消息发送:IM 服务在用户发送消息时,通过 RocketMQ 的异步发送将消息投递到消息队列。
- 消息消费:消息消费服务监听消息队列,收到消息后进行处理,包括检查消息的幂等性,并将消息内容落库。
- 确认消费:消息成功落库后,消费者返回消费成功的状态给 RocketMQ,确保消息不会被重复投递。
- 异常处理:如果消息消费失败,利用 RocketMQ 的重试机制进行重试,或者将消息转移到死信队列,等待后续处理。
通过上述步骤和策略,可以在使用 RocketMQ 时,尽可能地保证 IM 消息的可靠投递和落库。
rocketmq内置重试机制
- 消费者重试:对于消费者来说,如果消费消息失败(即消费逻辑抛出异常),消息会被标记为“重试消息”并稍后重新投递。RocketMQ 默认会重试 16 次,每次重试的间隔时间逐渐增加,从 1 秒到最多 2 小时。
- 生产者重试:对于同步发送的消息,如果发送失败,生产者客户端会根据配置的重试次数自动重试。异步发送和单向发送则需要开发者在回调函数中自行处理重试逻辑。
自定义重试策略
开发者可以根据业务需求自定义重试策略,这包括但不限于以下几种方式:
- 修改默认重试次数和间隔:通过配置文件或编程方式,可以调整默认的重试次数和重试间隔时间,使其更适合特定的业务场景。
- 自定义消费失败处理逻辑:在消费者处理消息的逻辑中,可以捕获异常,并根据异常类型或其他业务条件决定是否重试、重试的次数和间隔,或者将消息转移到另一个队列等待人工处理。
- 异步发送的自定义重试:对于异步发送的消息,可以在发送失败的回调函数中实现自定义的重试逻辑,比如基于异常类型选择是否重试、重试的次数和间隔等。
- 使用延时消息实现重试:对于需要延迟重试的场景,可以将失败的消息作为延时消息重新发送,通过设置延时级别来控制重试的时间。
- 持久化重试队列:对于重试次数多、间隔长或需要特殊处理的消息,可以将其持久化到数据库或日志文件中,通过外部程序或定时任务控制重试逻辑。
通过这些自定义策略,开发者可以根据业务的具体需求和特点,设计更加灵活和可靠的消息重试机制,以提高消息处理的成功率和系统的健壮性。
可靠生产代码演示
使用一个简单的例子
先添加依赖
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.0</version>
</dependency>
代码块
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class MessageService {@Autowiredprivate RocketMQTemplate rocketMQTemplate;public void sendMessageAsync(String topic, String message) {rocketMQTemplate.asyncSend(topic, message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {// 消息发送成功的处理System.out.println("Message sent successfully: " + sendResult);}@Overridepublic void onException(Throwable e) {// 消息发送失败的处理System.err.println("Failed to send message: " + e.getMessage());// 这里可以实现重试逻辑,例如:// retrySendMessage(topic, message);}});}// 重试发送消息的方法,这里简单地递归调用自身,实际应用中可能需要限制重试次数private void retrySendMessage(String topic, String message) {System.out.println("Retrying to send message...");sendMessageAsync(topic, message);}
}
在上述代码中,sendMessageAsync方法通过RocketMQTemplate的asyncSend方法异步发送消息,并提供了一个SendCallback回调来处理发送成功和失败的情况。在onException方法中,我们可以实现重试逻辑。这里的示例仅仅是简单地递归调用sendMessageAsync方法进行重试,实际应用中需要更复杂的重试策略,比如限制重试次数、设置重试间隔等。
请注意,过多的重试可能会对系统造成压力,特别是在高并发场景下。因此,设计重试策略时需要谨慎,确保既能提高消息发送的成功率,又不会对系统造成过大的负担。目前自己IM觉的重试三次吧。
可靠消息代码演示
- 定义消费者并设置重试次数
首先,需要在消费者配置中设置最大重试次数。RocketMQ默认会重试16次,每次重试间隔会逐渐增加。如果使用Spring Boot集成的RocketMQ,可以通过配置文件设置重试次数,或者在消费者注解中直接指定。 - 消费逻辑中处理异常
在消费逻辑中,捕获并处理可能发生的异常。如果消费失败,抛出一个自定义异常或特定的异常标记,RocketMQ会根据重试策略重新投递消息。 - 监听重试次数并转移至人工队列
RocketMQ不直接支持在消费端设置重试次数后自动转移队列的功能。因此,需要在消费逻辑中自行实现。可以通过消息的RECONSUME_TIME属性来判断当前重试次数,如果达到预设的最大重试次数后仍然失败,则手动将消息发送到另一个队列。
以下是一个简化的示例,展示如何在Spring Boot应用中使用RocketMQ实现这一逻辑:
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;@Service
@RocketMQMessageListener(topic = "your-topic", consumerGroup = "your-consumer-group", consumeMode = ConsumeMode.ORDERLY)
public class YourConsumerService implements RocketMQListener<Message<String>> {@Autowiredprivate RocketMQTemplate rocketMQTemplate;private static final int MAX_RECONSUME_TIMES = 3; // 最大重试次数@Overridepublic void onMessage(Message<String> message) {try {// 处理消息System.out.println("Received message: " + message.getPayload());// 如果处理失败,抛出异常// throw new Exception("处理失败");} catch (Exception e) {Integer reconsumeTimes = message.getHeaders().get("RECONSUME_TIMES", Integer.class);if (reconsumeTimes != null && reconsumeTimes >= MAX_RECONSUME_TIMES) {// 超过最大重试次数,转移到人工队列rocketMQTemplate.convertAndSend("manual-handle-topic", message.getPayload());System.out.println("Transfer to manual handle queue: " + message.getPayload());} else {// 未达到最大重试次数,抛出异常,让RocketMQ进行重试throw new RuntimeException("Consume failed, need retry.");}}}
}
记录重试还失败的消息
配置一个专门的消费者,一般来说可以通过Web界面、日志、邮件等展示给操作人员进行人工处理。但是我们这里就简单的存储一下库吧,然后记录下失败的时候,记录下原因。方便后续分析原因。解释下原因。因为是IM系统时效性很重要的。人工处理黄花菜都凉了。所以一般使用自动化处理策略。我们的解决办法是先重试,重试还失败通知用户发送消息失败。以并发优先。以数据库数据为准。
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;@Service
@RocketMQMessageListener(topic = "manual-handle-topic", consumerGroup = "manual-handle-group", consumeMode = ConsumeMode.ORDERLY)
public class ManualHandleConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {// 将消息展示给操作人员进行处理// 例如,打印消息、发送邮件、记录日志等System.out.println("Manual handle message: " + message);// 或者将消息存储到数据库,通过Web界面供操作人员处理}
}