RabbitMQ消息可靠性(一)-- 生产者消息确认

前言

在项目中,引入了RabbitMQ这一中间件,必然也需要在业务中增加对数据安全性的一层考虑,来保证RabbitMQ消息的可靠性,否则一个个消息丢失可能导致整个业务的数据出现不一致等问题,对系统带来巨大的影响,消息的可靠性可以主要在三个方面去考虑:生产者消息确认,消费者消息确认,消息持久化,这篇文件说明生产者消息确认的。


一、消息确认流程图

由图可知,消息确认是分为生产者确认和消费者确认的,生产者和MQ之间的消息确认机制为生产者消息确认,MQ和消费者之间的消息确认机制为消费者消息确认

消息丢失的情景有三种情况:

  1. 发送消息过程中出现网络问题:生产者以为发送成功,但MQ没有收到;(需要生产者消息确认)
  2. 接收到消息后由于MQ服务器宕机或重启等原因(消息默认存在内存中)导致消息丢失;(需要消息持久化)
  3. 消费者接收到消息后处理消息出错,没有完成消息的处理,但是自动返回ack(这时候需要开启手动确认模式,消费者消息确认)

二、生产者消息确认

RabbitMQ提供了publisher confirm机制来避免消息投递到MQ过程中丢失。这种机制下每个message都必须要有一个独一无二的ID,来区分开不同的消息,避免ack(消息确认参数)冲突。每当消息发送到MQ成功后,MQ都会返回一个结果给生产者,以保证生产者消息确认。在生产者消息确认时,又有两种返回结果方式(通常两个都要实现)来确保消息投递可靠性,分别为publisher-confirm和publisher-return,以下作出说明。

1、publisher-confirm(发送者确认)

消息成功投递到交换机,返回ack

消息未投递到交换机,返回nack

2、publisher-return(发送者回执)

消息投递到交换机了,但是没有路由到队列。返回ACK,及路由失败原因。


三、代码实现

1、配置文件

spring:rabbitmq:host: localhostport: 5672username: guestpassword: guest#确认消息已发送到交换机(Exchange)publisher-confirm-type: correlated#确认消息已发送到队列(Queue)publisher-returns: true

publish-confirm-type有三个值,

  1. none:禁用发布确认模式,是默认值
  2. simple:同步等待confirm结果,直到超时
  3. correlated:异步回调,定义ConfirmCallback,MQ返回结果时会回调这个ConfirmCallback

publisher-returns:开启消息失败回调,回调函数ReturnCallback

2、配置ConfirmCallback函数和ReturnCallback函数

/*** 生产者消息回调配置类*/
@Configuration
@Slf4j
public class ProviderCallBackConfig {@Resourceprivate CachingConnectionFactory cachingConnectionFactory;@Beanpublic RabbitTemplate rabbitTemplate() {RabbitTemplate rabbitTemplate = new RabbitTemplate(cachingConnectionFactory);// 当mandatory设置为true时,若exchange根据自身类型和消息routingKey无法找到一个合适的queue存储消息,//那么broker会调用basic.return方法将消息返还给生产者。// 当mandatory设置为false时,出现上述情况broker会直接将消息丢弃。rabbitTemplate.setMandatory(true);/*** TODO RabbitMQ生产者发送消息确认回调,解决消息可靠性问题* 消息确认回调,确认消息是否到达broker* data:消息唯一标识* ack:确认结果* cause:失败原因*/rabbitTemplate.setConfirmCallback((data, ack, cause) -> {if (ack) {//消息发送成功后,更新数据库消息状态等逻辑log.info("消息发送至exchange成功------>消息唯一标识: {}, 确认状态: {}, 造成原因: {}",data, ack, cause);} else {//信息发送失败,打印日志后,可以根据业务选择是否重发消息log.info("消息发送至exchange失败------>消息唯一标识: {}, 确认状态: {}, 造成原因: {}", data, ack, cause);}});/*** TODO RabbitMQ生产者发送消息失败回调,解决消息可靠性问题* message      消息* replyCode    回应码* replyText    回应信息* exchange     交换机* routingKey   路由键*/rabbitTemplate.setReturnsCallback((res) -> {//若发送失败,打印错误信息,然后可以根据业务选择重发消息log.error("消息发送至queue失败-------->res: {}", JSON.toJSONString(res));});return rabbitTemplate;}}

到这里,生产者推送消息的消息确认调用回调函数已经完毕。
可以看到上面写了两个回调函数,一个叫 ConfirmCallback ,一个叫 RetrunCallback;
那么以上这两种回调函数都是在什么情况会触发呢?

先从总体的情况分析,推送消息存在3种情况:

①消息推送到server,但是在server里找不到交换机
②消息推送到server,找到交换机了,但是没找到队列
③消息推送成功

①消息推送到server,但是在server里找不到交换机
写个测试接口,把消息推送到名为‘non-existent-exchange’的交换机上(这个交换机是没有创建没有配置的):

@GetMapping("/testProviderMessageBack")@ApiOperation(value = "测试生产者消息回调")@ApiOperationSupport(order = 5)public String testProviderMessageBack() {CorrelationData data = new CorrelationData();data.setId("111");rabbitTemplate.convertAndSend("non-existent-exchange", "TestDirectRouting", "测试生产者消息回调",data);return "ok";}

调用接口,查看项目的控制台输出情况(原因里面有说,没有找到交换机'non-existent-exchange'):

结论: ①这种情况触发的是 ConfirmCallback 回调函数

消息发送至exchange失败------>

消息唯一标识: CorrelationData [id=111],

确认状态: false,

造成原因: channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no exchange 'non-existent-exchange' in vhost '/', class-id=60, method-id=40)

②消息推送到server,找到交换机了,但是没找到队列  
这种情况就是需要新增一个交换机,但是不给这个交换机绑定队列,我来简单地在DirectRabitConfig里面新增一个直连交换机,名叫‘lonelyDirectExchange’,但没给它做任何绑定配置操作:

    @BeanDirectExchange lonelyDirectExchange() {return new DirectExchange("lonelyDirectExchange");}

然后写个测试接口,把消息推送到名为‘lonelyDirectExchange’的交换机上(这个交换机是没有任何队列配置的):

@GetMapping("/testProviderMessageBack2")@ApiOperation(value = "测试生产者消息回调2")@ApiOperationSupport(order = 6)public String testProviderMessageBack2() {CorrelationData data = new CorrelationData();data.setId("222");rabbitTemplate.convertAndSend("lonelyDirectExchange", "TestLonelyDirectRouting", "测试生产者消息回调2",data);return "ok";}

消息发送至exchange成功------>

消息唯一标识: CorrelationData [id=222],

确认状态: true,

造成原因: null

消息发送至queue失败-------->

res: {"exchange":"lonelyDirectExchange","message":{"body":"5rWL6K+V55Sf5Lqn6ICF5raI5oGv5Zue6LCDMg==","messageProperties":{"contentEncoding":"UTF-8","contentLength":0,"contentType":"text/plain","deliveryTag":0,"finalRetryForMessageWithNoId":false,"headers":{"spring_returned_message_correlation":"222"},"lastInBatch":false,"priority":0,"projectionUsed":false,"publishSequenceNumber":0,"receivedDeliveryMode":"PERSISTENT"}},"replyCode":312,"replyText":"NO_ROUTE","routingKey":"TestLonelyDirectRouting"}

这种情况下,两个函数都被调用了,

消息是推送成功到交换机了的,所以ConfirmCallback对消息确认情况是true;
而在RetrunCallback回调函数的打印参数里面可以看到,在路由分发给队列的时候,找不到队列,所以报了错误 NO_ROUTE 。
结论:②这种情况触发的是 ConfirmCallback和RetrunCallback两个回调函数。

③消息推送成功
那么测试下,按照正常调用之前消息推送的接口就行,就调用下 /sendDirectMessage接口,可以看到控制台输出:

结论:这种情况触发的是 ConfirmCallback 回调函数。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/110189.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

发现某设备 adb shell ps 没有输出完整信息

某错误示例 并不是都使用 -ef 参数查找都能够返回完整信息&#xff0c;某些版本设备不适用 -ef 也不会返回完整信息。 简单兼容 简单兼容不同版本 Android 设备查找进程列表&#xff0c;没有通过脚本判断 Android 版本&#xff0c;如有兴趣可以自己修改。 :loop adb shell…

使用docker创建minio镜像并上传文件,提供demo

使用docker创建minio镜像并上传文件&#xff0c;提供demo 1. 整体描述2. 环境搭建2.1 windows环境搭建2.2 docker部署 3. spring集成3.1 添加依赖3.2 配置文件3.3 创建config类3.4 创建minio操作类3.5 创建启动类3.6 测试controller 4. 测试操作4.1 demo运行4.2 页面查看4.3 上…

普中51-数码管实验

文章目录 数码管实验**静态数码管实验**动态数码管实验多位数码管简介数码管动态显示原理74HC245 和74HC138芯片介绍74HC245 芯片简介74HC138 芯片简介 代码如下&#xff1a; 数码管实验 如图所示&#xff1a; 从上图可看出&#xff0c;一位数码管的引脚是 10 个&#xff0c;…

c++11的一些新特性

c11 1. {}初始化2. 范围for循环3. final与override4. 右值引用4.1 左值引用和右值引用4.2 左值引用与右值引用比较 5. lambda表达式6. 声明6.1 auto6.2 decltype6.3 nullptr 7. 可变参数模版 1. {}初始化 在C中&#xff0c;使用花括号初始化的方式被称为列表初始化。列表初始化…

C++:初始化列表,static成员,友元,内部类

个人主页 &#xff1a; 个人主页 个人专栏 &#xff1a; 《数据结构》 《C语言》《C》 文章目录 前言一、初始化列表二、static成员三、友元四、内部类总结 前言 本篇博客作为C&#xff1a;初始化列表&#xff0c;static成员&#xff0c;友元&#xff0c;内部类的知识总结。 一…

java面向对象(二)

文章目录 一、方法1.例子2.例子3.例子 二、使用步骤1.举例说明类跟对象&#xff08;高大上&#xff09;2.理解“万事万物皆对象”3.变量在内存中的位置体现4.引用类型的变量5.匿名对象的使用 提示&#xff1a;以下是本篇文章正文内容&#xff0c;下面案例可供参考 一、方法 描…

Mysql开启binlog

本案例基于mysql5.7.16实验 1、在linux中进入mysql查询binlog是否打开&#xff0c;执行命令如下&#xff1a; mysql -u root -p 2、查询binlog是否开启命令如下&#xff0c;如果log_bin为OFF则证明mysql的binlog没有打开 show variables like %log_bin%; 3、退出mysql终端&…

基于 Alpine 环境构建 aspnetcore6-runtime 的 Docker 镜像

关于 Alpine Linux 此处就不再过多讲述&#xff0c;请自行查看相关文档。 .NET 支持的体系结构 下表列出了当前支持的 .NET 体系结构以及支持它们的 Alpine 版本。 这些版本在 .NET 到达支持终止日期或 Alpine 的体系结构受支持之前仍受支持。请注意&#xff0c;Microsoft 仅正…

unity 使用声网(Agora)实现语音通话

第一步、先申请一个声网账号 [Agora官网链接]&#xff08;https://console.shengwang.cn/&#xff09; 第二步在官网创建项目 &#xff0c;选择无证书模式&#xff0c;证书模式需要tokenh和Appld才能通话 第三步 官网下载SDK 然后导入到unity&#xff0c;也可以直接在unity商店…

Spring-Cloud GateWay+Vue 跨域方案汇总

文章目录 一、简介背景和概述 二、前端跨域解决方案Axios跨域CORS跨域 三、后端跨域解决方案反向代理服务器 四、Spring Cloud中的跨域解决方案Gateway网关的跨域配置 五、基于Vue和Spring Cloud的跨域整合实践**这两种配置只需配置一种即可生效&#xff08;前端or后端&#xf…

Zookeeper 启动失败【Cannot open channel to 3 at election address...】

文章目录 完整报错信息解决方法1.检查文件夹权限2.未监听所有IP3.IP映射名称与 ID 不对应 完整报错信息 Cannot open channel to 3 at election address hadoop121/192.168.10.121:3888 java.net.ConnectException 解决方法 1.检查文件夹权限 检查当前用户是否拥有 Zookeep…

【SpringMVC】JSR 303与interceptor拦截器快速入门

目录 一、JSR303 1、什么是JSR 303&#xff1f; 2、为什么要使用JSR 303&#xff1f; 3、JSR 303常用注解 3.1、常用的JSR 303注解 3.2、Validated与Valid区别 3.2.1、Validated 3.2.2、Valid 3.2.3、区别 4、使用案例 4.1、导入依赖 4.2、配置校验规则 4.3、编写…