【分布式websocket】RocketMQ发送消息保证消息最终一致性需要做哪些处理?【第15期】

前言

发送消息的逻辑在发给mq后消息链路就直接结束了。那么消息在mq收到后我们该如何保证后面消息一定能存储呢?执行业务逻辑出错了怎么办呢?当然这期只是IM特定场景下的一致性哈,不是分布式事务哈,有点小区别?使用MQ做分布式事务后面肯定也会发文章。
在这里插入图片描述

目前已经写的文章有。并且有对应视频版本。
git项目地址 【IM即时通信系统(企聊聊)】点击可跳转
sprinboot单体项目升级成springcloud项目 【第一期】
前端项目技术选型以及页面展示【第二期】
分布式权限 shiro + jwt + redis【第三期】
给为服务添加运维模块 统一管理【第四期】
微服务数据库模块【第五期】
netty与mq在项目中的使用(第六期(废弃))】
分布式websocket即时通信(IM)系统构建指南【第七期】
分布式websocket即时通信(IM)系统保证消息可靠性【第八期】
分布式websocket IM聊天系统相关问题问答【第九期】
什么?websocket也有权限!这个应该怎么做?【第十期】
分布式ID是什么,以美团Leaf为例改造融入自己项目【第十一期】
IM聊天系统为什么需要做消息幂等?如何使用Redis以及Lua脚本做消息幂等【第12期】
微信发送一条消息经历哪些过程。企业微信以及钉钉的IM架构对比【第13期】
微信群为什么上限是500人,IM设计系统中的群聊的设计难点【第14期】
B站上面关注我呐 B站和CSDN同名,1000粉丝后建群。然后B站关注我后可以私信CSDN来的,然后后面我建群的时候拉你!

问题回顾

简单的说问题就是IM发送一条消息投递到mq里面,然后mq如何能保证这条消息落库。确保消息一定能落库涉及到消息的可靠生产和消费。我们这篇文章先分析一下技术方案,然后进行选择,一步一步来。 以下是一些可以选择的步骤和策略:

    1. 可靠生产
      同步发送:在发送消息时,使用 RocketMQ 的同步发送方式,这样生产者会等待消息服务器的响应,确保消息已经被成功接收。
      消息发送重试:对于发送失败的消息,可以在生产者端进行重试。RocketMQ 客户端自带重试机制,但你也可以根据业务需求自定义重试策略。
      事务消息:如果消息发送是业务操作的一部分,可以使用 RocketMQ 的事务消息来保证本地事务和消息发送的原子性。这样,只有当本地事务成功提交时,消息才会被真正发送出去。
    1. 可靠消费
      消费状态确认:在消费者处理完消息并成功将其落库后,必须正确地返回消费成功的状态。这样,RocketMQ 不会再次投递这条消息。
      幂等性处理:为了防止消息被重复消费(例如,在消息重试的情况下),消费者在处理消息时需要实现幂等性。这通常通过在数据库中记录每条消息的唯一标识,并在处理前检查该标识是否已存在来实现。
      死信队列处理:对于无法成功处理的消息(如因为业务逻辑错误或系统异常),应该将其转移到死信队列。然后可以通过人工干预或自动化脚本处理这些消息,确保它们最终被正确处理。
    1. 消息监控和告警
      监控:利用 RocketMQ 提供的监控工具,如 RocketMQ Dashboard,监控消息的生产和消费状态,及时发现问题。
      告警:设置合理的告警规则,当发现消息堆积、消费延迟等异常情况时,及时通知相关人员进行处理。

选择技术方案

从上面可靠生产的方案选择异步发送重试策略。同步对系统性能影响大了。所以选择异步加重试。

采用如下流程

  1. 消息发送:IM 服务在用户发送消息时,通过 RocketMQ 的异步发送将消息投递到消息队列。
  2. 消息消费:消息消费服务监听消息队列,收到消息后进行处理,包括检查消息的幂等性,并将消息内容落库。
  3. 确认消费:消息成功落库后,消费者返回消费成功的状态给 RocketMQ,确保消息不会被重复投递。
  4. 异常处理:如果消息消费失败,利用 RocketMQ 的重试机制进行重试,或者将消息转移到死信队列,等待后续处理。
    通过上述步骤和策略,可以在使用 RocketMQ 时,尽可能地保证 IM 消息的可靠投递和落库。

rocketmq内置重试机制

  1. 消费者重试:对于消费者来说,如果消费消息失败(即消费逻辑抛出异常),消息会被标记为“重试消息”并稍后重新投递。RocketMQ 默认会重试 16 次,每次重试的间隔时间逐渐增加,从 1 秒到最多 2 小时。
  2. 生产者重试:对于同步发送的消息,如果发送失败,生产者客户端会根据配置的重试次数自动重试。异步发送和单向发送则需要开发者在回调函数中自行处理重试逻辑。
    自定义重试策略

开发者可以根据业务需求自定义重试策略,这包括但不限于以下几种方式:

  1. 修改默认重试次数和间隔:通过配置文件或编程方式,可以调整默认的重试次数和重试间隔时间,使其更适合特定的业务场景。
  2. 自定义消费失败处理逻辑:在消费者处理消息的逻辑中,可以捕获异常,并根据异常类型或其他业务条件决定是否重试、重试的次数和间隔,或者将消息转移到另一个队列等待人工处理。
  3. 异步发送的自定义重试:对于异步发送的消息,可以在发送失败的回调函数中实现自定义的重试逻辑,比如基于异常类型选择是否重试、重试的次数和间隔等。
  4. 使用延时消息实现重试:对于需要延迟重试的场景,可以将失败的消息作为延时消息重新发送,通过设置延时级别来控制重试的时间。
  5. 持久化重试队列:对于重试次数多、间隔长或需要特殊处理的消息,可以将其持久化到数据库或日志文件中,通过外部程序或定时任务控制重试逻辑。
    通过这些自定义策略,开发者可以根据业务的具体需求和特点,设计更加灵活和可靠的消息重试机制,以提高消息处理的成功率和系统的健壮性。

可靠生产代码演示

使用一个简单的例子
先添加依赖

<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.2.0</version>
</dependency>

代码块

import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class MessageService {@Autowiredprivate RocketMQTemplate rocketMQTemplate;public void sendMessageAsync(String topic, String message) {rocketMQTemplate.asyncSend(topic, message, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {// 消息发送成功的处理System.out.println("Message sent successfully: " + sendResult);}@Overridepublic void onException(Throwable e) {// 消息发送失败的处理System.err.println("Failed to send message: " + e.getMessage());// 这里可以实现重试逻辑,例如:// retrySendMessage(topic, message);}});}// 重试发送消息的方法,这里简单地递归调用自身,实际应用中可能需要限制重试次数private void retrySendMessage(String topic, String message) {System.out.println("Retrying to send message...");sendMessageAsync(topic, message);}
}

在上述代码中,sendMessageAsync方法通过RocketMQTemplate的asyncSend方法异步发送消息,并提供了一个SendCallback回调来处理发送成功和失败的情况。在onException方法中,我们可以实现重试逻辑。这里的示例仅仅是简单地递归调用sendMessageAsync方法进行重试,实际应用中需要更复杂的重试策略,比如限制重试次数、设置重试间隔等。
请注意,过多的重试可能会对系统造成压力,特别是在高并发场景下。因此,设计重试策略时需要谨慎,确保既能提高消息发送的成功率,又不会对系统造成过大的负担。目前自己IM觉的重试三次吧。

可靠消息代码演示

  1. 定义消费者并设置重试次数
    首先,需要在消费者配置中设置最大重试次数。RocketMQ默认会重试16次,每次重试间隔会逐渐增加。如果使用Spring Boot集成的RocketMQ,可以通过配置文件设置重试次数,或者在消费者注解中直接指定。
  2. 消费逻辑中处理异常
    在消费逻辑中,捕获并处理可能发生的异常。如果消费失败,抛出一个自定义异常或特定的异常标记,RocketMQ会根据重试策略重新投递消息。
  3. 监听重试次数并转移至人工队列
    RocketMQ不直接支持在消费端设置重试次数后自动转移队列的功能。因此,需要在消费逻辑中自行实现。可以通过消息的RECONSUME_TIME属性来判断当前重试次数,如果达到预设的最大重试次数后仍然失败,则手动将消息发送到另一个队列。
    以下是一个简化的示例,展示如何在Spring Boot应用中使用RocketMQ实现这一逻辑:
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;@Service
@RocketMQMessageListener(topic = "your-topic", consumerGroup = "your-consumer-group", consumeMode = ConsumeMode.ORDERLY)
public class YourConsumerService implements RocketMQListener<Message<String>> {@Autowiredprivate RocketMQTemplate rocketMQTemplate;private static final int MAX_RECONSUME_TIMES = 3; // 最大重试次数@Overridepublic void onMessage(Message<String> message) {try {// 处理消息System.out.println("Received message: " + message.getPayload());// 如果处理失败,抛出异常// throw new Exception("处理失败");} catch (Exception e) {Integer reconsumeTimes = message.getHeaders().get("RECONSUME_TIMES", Integer.class);if (reconsumeTimes != null && reconsumeTimes >= MAX_RECONSUME_TIMES) {// 超过最大重试次数,转移到人工队列rocketMQTemplate.convertAndSend("manual-handle-topic", message.getPayload());System.out.println("Transfer to manual handle queue: " + message.getPayload());} else {// 未达到最大重试次数,抛出异常,让RocketMQ进行重试throw new RuntimeException("Consume failed, need retry.");}}}
}

记录重试还失败的消息

配置一个专门的消费者,一般来说可以通过Web界面、日志、邮件等展示给操作人员进行人工处理。但是我们这里就简单的存储一下库吧,然后记录下失败的时候,记录下原因。方便后续分析原因。解释下原因。因为是IM系统时效性很重要的。人工处理黄花菜都凉了。所以一般使用自动化处理策略。我们的解决办法是先重试,重试还失败通知用户发送消息失败。以并发优先。以数据库数据为准。

import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;@Service
@RocketMQMessageListener(topic = "manual-handle-topic", consumerGroup = "manual-handle-group", consumeMode = ConsumeMode.ORDERLY)
public class ManualHandleConsumer implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {// 将消息展示给操作人员进行处理// 例如,打印消息、发送邮件、记录日志等System.out.println("Manual handle message: " + message);// 或者将消息存储到数据库,通过Web界面供操作人员处理}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/505644.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

linux系统Jenkins工具流水线项目发布

流水线发布 pipline语法介绍阶段指令 创建流水线项目流水线脚本基础框架 pipline语法介绍 声明式的pipeline语法格式 1. 所有的声明都必须包含在pipeline{}中 2. 块只能有节段&#xff0c;指令&#xff0c;步骤或者赋值语句组成 3. 阶段&#xff1a;agent&#xff0c;stages&a…

手动实现一个简单的 HTTP 请求

本文我们通过 Socket&#xff0c;写一个 HTTP 协议&#xff0c;直观的感受一下上篇文章中的请求和响应。 定义 socket server 通过上篇文章&#xff0c;我们知道 HTTP 协议底层是通过 Socket 实现的&#xff0c;所以我们先通过 socket 定义一个 server import socket#初始化 …

如何将一个远程git的所有分支推到另一个远程分支上

如何将一个远程git的所有分支推到另一个远程分支上 最初有 12 个分支 执行 git remote add 远程名 远程git地址 git push 远程名 --tags "refs/remotes/origin/*:refs/heads/*"之后就变成 26个分支

STM32单片机示例:ETH_DP83848_DHCP_NonOS_Poll_F407

文章目录 目的基础说明主要配置关键代码示例演示示例链接关于中断总结 目的 以太网是比较常用到的功能&#xff0c;这篇文章讲演示在STM32F407上启用以太网功能&#xff0c;使之能够加入网络中&#xff0c;通过DHCP获得IP地址&#xff0c;可以被Ping通。 基础说明 STM32F407…

Docker_设置docker服务以及容器开机自启

本文目录 docker服务开机自启动查询docker服务开机自启动状态将docker服务设置为开机自启动取消docker服务开机自启动 容器开机自启动修改docker容器为自启动容器启动时设置自启动-docker版容器启动时设置自启动-docker-compose版 docker服务开机自启动 查询docker服务开机自启…

对称加密与非对称加密

1、对称加密 对称加密&#xff0c;即采用对称的密码编码技术&#xff0c;他的特点是&#xff0c;加密和解密使用相同的秘钥。 常见的对称加密算法有DES、3DES、Blowfish、IDEA、RC4、RC5、RC6和AES。 优点&#xff1a;对称加密算法使用起来简单快捷&#xff0c;密钥较短&…

STM32作为SPI slave与主机异步通信

背景 最近被测试提了个BUG&#xff0c;说某款产品在用户按下前面板的按键后&#xff0c;对应的按键灯没有亮起来。前面板跟主机是通过SPI口通信&#xff0c;前面板是从机&#xff0c;从机想要主动发送消息&#xff0c;需要通过GPIO中断来通知主机&#xff1a; 上图前面板是ST…

小程序图形:echarts-weixin 入门使用

去官网下载整个项目&#xff1a; https://github.com/ecomfe/echarts-for-weixin 拷贝ec-canvs文件夹到小程序里面 index.js里面的写法 import * as echarts from "../../components/ec-canvas/echarts" const app getApp(); function initChart(canvas, width, h…

WEB漏洞 SSRF简单入门实践

一、漏洞原理 SSRF 服务端请求伪造 原理&#xff1a;在某些网站中提供了从其他服务器获取数据的功能&#xff0c;攻击者能通过构造恶意的URL参数&#xff0c;恶意利用后可作为代理攻击远程或本地的服务器。 二、SSRF的利用 1.对目标外网、内网进行端口扫描。 2.攻击内网或本地的…

复试PAT乙级day34

1111~1115 1113 很难&#xff0c;看了题解 人类习惯用 10 进制&#xff0c;可能因为大多数人类有 10 根手指头&#xff0c;可以用于计数。这个世界上有一种叫“钱串子”&#xff08;学名“蚰蜒”&#xff09;的生物&#xff0c;有 30 只细长的手/脚&#xff0c;在它们的世界里…

实践航拍小目标检测,基于YOLOv8全系列【n/s/m/l/x】参数模型开发构建无人机航拍场景下的小目标检测识别分析系统

关于无人机相关的场景在我们之前的博文也有一些比较早期的实践&#xff0c;感兴趣的话可以自行移步阅读即可&#xff1a; 《deepLabV3Plus实现无人机航拍目标分割识别系统》 《基于目标检测的无人机航拍场景下小目标检测实践》 《助力环保河道水质监测&#xff0c;基于yolov…

在网页上踢球:打造我自己的python(Django)足球网站

足球不仅仅是球场上的90分钟。它是一个不断发展的故事&#xff0c;一个全球球迷社群的粘合剂&#xff0c;一个数据和热情交织的世界。作为一名开发者和球迷&#xff0c;我决定将这两大爱好结合起来&#xff0c;用 Django 打造一个足球网站&#xff0c;让球迷们能够追踪他们最爱…