RabbitMQ 发布确认机制

发布确认模式是避免消息由生产者到RabbitMQ消息丢失的一种手段

发布确认模式

  • 原理说明
  • 实现方式
    • 开启confirm(确认)模式
    • 阻塞确认
    • 异步确认
  • 总结

原理说明

  生产者通过调用channel.confirmSelect方法将信道设置为confirm模式,之后RabbitMQ会返回Confirm.Select-OK命令表示同意生产者将当前信道设置为confirm模式。
  confirm模式下的信道所发送的消息都将被应带ack或者nack一次,不会出现一条消息即被ack又被nack的情况,并且RabbitMQ也并没有对消息被confirm的快慢做出保证,消息被confirm是异步进行。
在这里插入图片描述

  如上图所示为confirm模式下的消息发送过程,其中4和6为异步应答,也就是说4过程并不一定在5之前,也有可能是在下一条消息发送后才会进行上一条消息的应答。
  RabbitMQ 事务和发送确认机制确保的是消息能够正确的发送至RabbitMQ的交换机,如果交换机没有匹配的队列,那么消息也会被丢失。和事务不同的是,发布确认机制是异步进行的,因此在性能上发布确认模式将更加优秀,需要注意的是:事务和确认机制是互斥的,不能共存
  事务机制和发布确认机制都存在以下注意点:

  • 如果消息需要持久化并且存在队列,则在消息入队并且持久化后进行返回事务提交成功或者应答消息。
  • 如果消息不需要持久化但是存在队列,则在消息入队后返回事务提交成功或者应答消息。
  • 如果消息不可路由到队列中,则在路由失败后返回事务提交成功或者应答消息。

  上文中一直强调的时发布确认针对发布发送到RabbitMQ中的交换机进行保证,但消息实际是否能入队发布确认机制并不能提供保证,因此还需要和mandatory参数配合使用。

实现方式

  RabbitMQ的发布确认机制可以分为三种实现方式:阻塞等待确认、批量阻塞等待确认、异步确认。
阻塞等待确认:每当消息发送后,发送者都阻塞的等待应答消息。这种实现方式将无法体现发布确认模式的异步性能优势。
批量阻塞确认:批量阻塞确认类似于阻塞等待确认,区别在于批量阻塞确认并不会针对每条消息进行阻塞等待,他会针对一些消息进行统一阻塞等待应答消息。这种实现方式将同步和异步结合起来进行使用,对应答性能有一定的提升。
异步应答:实现一个监听器的方式接收应答消息,应答消息的处理逻辑不会影响消息的发送,消息的应答和消息发送是异步进行的,他们并不直接相互干扰。
上面对三种确认方式进行简单说明,下面将分别介绍发布确认机制的实现方式。

开启confirm(确认)模式

  确认模式的开启是针对信道设置的,一旦信道进入了confirm模式,所有在该信道上面发布的消息都会被指派唯一的ID,RabbitMQ也将针对该信道发送的所有消息都进行应答。
  RabbitMQ回传给生产者的确认消息中的deliverryTag包含了确认消息的序号,但在使用(批量)阻塞确认方式进行实现的时候该消息序号无意义。开启confirm模式仅需要以下代码进行实现即可:

channel.confirmSelect();

阻塞确认

  阻塞确认的方式依赖于channel.waitForConfirms()方法,该方法如下所示:

    /*** Wait until all messages published since the last call have been* either ack'd or nack'd by the broker.  Note, when called on a* non-Confirm channel, waitForConfirms throws an IllegalStateException.* @return whether all the messages were ack'd (and none were nack'd)* @throws java.lang.IllegalStateException*/boolean waitForConfirms() throws InterruptedException;

  自从上次调用该方法后直到所有发送的消息都被应答后返回所有消息的应答结果,如果所有发送的消息应答结果都是成功则返回true,一旦存在任何一条消息应答失败则返回false。
  根据该方法的描述可知,可以通过该方法实现阻塞等待确认和批量阻塞确认两种方案,区别仅在于是发送一条消息调用一次该方法还是发送一批消息后调用一次这个方法。
  阻塞等待确认的方式如下代码所示:

//发送消息
channel.basicPublish(EXCHANGE_NAME ,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
// 如果发送失败则进行该条消息的重新发送
if(!channel.waitForConfirms()){channel.basicPublish(EXCHANGE_NAME ,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
}

 阻塞批量确认的方式如下代码所示:

        // 存储未应带消息队列List<String> messages = new ArrayList<>();for (int i = 1; i < 20000 ;  i++){String msg = String.valueOf(i);messages.add(msg);channel.basicPublish(EXCHANGE_NAME ,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());// 每发送十条消息进行一次确认if(i > 0 && i % 10 == 0 ){// 如果确认不通过则将消息重新发送if(!channel.waitForConfirms()){for (String e : messages) {channel.basicPublish(EXCHANGE_NAME ,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,e.getBytes());}}else{// 如果确认成功则将这些消息从未应答队列中移除messages.clear();}}}

异步确认

  客户端Channel提供了addConfirmListener方法,该可以添加ConfirmListener这个回调接口,该接口包含两个方法:handleAck和handleNack,分别用来处理饭hi的Ack和Nack,这两个方法都将返回一个参数deliveryTag(消息的唯一有序序号)和一个boolean型参数multiple,如果该参数为true表示自该消息之前的所有消息RabbitMQ服务都已经做出了应答。我们可以通过该值实现具体业务的发布确认。

/**
* Implement this interface in order to be notified of Confirm events.
* Acks represent messages handled successfully; Nacks represent
* messages lost by the broker.  Note, the lost messages could still
* have been delivered to consumers, but the broker cannot guarantee
* this.
* For a lambda-oriented syntax, use {@link ConfirmCallback}.
*/
public interface ConfirmListener {void handleAck(long deliveryTag, boolean multiple)throws IOException;void handleNack(long deliveryTag, boolean multiple)throws IOException;
}

  异步确认的方式实现起来比较复杂,在生产者端需要维护一个消息队列,如果消息应答成功则将该消息从队列中移除,如果消息应答失败则将该消息再重新发送或进行其他业务处理。该逻辑伪码如下所示:

        // 存储未确认消息,其中key为消息序号,value为消息实体HashMap<Long,String> msgMap = new HashMap<>();channel.addConfirmListener(new ConfirmListener() {@Overridepublic void handleAck(long deliveryTag, boolean multiple) throws IOException {msgMap.remove(deliveryTag);}/*** 如果消息应带结果为nack则重新发送该消息* @param deliveryTag* @param multiple* @throws IOException*/@Overridepublic void handleNack(long deliveryTag, boolean multiple) throws IOException {String msg = msgMap.get(deliveryTag);if(msg != null){channel.basicPublish(EXCHANGE_NAME ,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());}}});for (int i = 1; i < 20000 ;  i++){String msg = String.valueOf(i);// 将消息序号和消息存储map中msgMap.put(channel.getNextPublishSeqNo(),msg);channel.basicPublish(EXCHANGE_NAME ,ROUTING_KEY,MessageProperties.PERSISTENT_TEXT_PLAIN,msg.getBytes());}

  上述代码使用了map存储消息序号和消息实体,这种存储方式应该会存在风险,由于监听器和消息发送过程是异步进行了,因此可能会存在线程安全的问题,HashMap是非线程安全的。

总结

  发布确认模式是为我们解决消息自生产者发送到RabbitMQ交换机过程中消息丢失的问题的,这一场景需求我们也可以通过事务机制实现。发布确认模式和事务机制比较如下表所示:

比较事务机制发布确认机制
实现方式通过AMQP协议层面实现轻量级实现,采用RabbitMQ应答机制
命令详解Tx.Select
Basic.Publish
Tx.Commit
Commit.OK
Basic.Publish
Basic.Ack
性能同步,性能较慢可异步实现也可同步实现,性能快,AMQP命令交互少
消息到达队列时机事务提交后消息才会进入队列,消息入队存在滞后性消息发送后就进入队列,发布确认模式不影响消息进入队列时机
事务提交成功或消息应答时机消息被交换机处理完成后,或消息不可达同事务
实现复杂度简单相对复杂
适合场景批量发送消息,实现批量消息的原子性和一致性确保消息发送到交换机

  发布确认模式的具体实现可以划分为三种:阻塞等待、批量确认、异步确认,这三者的比较如下表所示:

比较内容阻塞等待批量等待异步确认
性能
实现复杂度
确认范围每条消息批量消息每条消息
是否可以精准确认每条消息

  根据上述内容,我们在实现避免消息自生产者到交换机丢失的机制时建议使用发布确认模式的异步确认,因为异步确认性能最高,并且可以准确的得到被应答的消息的序号,有助于我们进行后续逻辑处理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/61317.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Linux下 时间戳的转化

Linux下一般用date 记录当前时间&#xff0c;尤其是我们需要保存测试log的时候&#xff0c;或者设计一个跑多长时间的脚本都需要时间戳。下面看一下平时最常用的几种写法 1 date “%Y-%m-%d %H:%M” 显示具体时间 2 修改时间 date -s 3 date %s :当前时间的时间戳 显示具体时…

ruoyi-cloud微服务新建子模块

ruoyi-cloud微服务新建子模块 1、复制system模块 直接复制 modules下面已有的system模块&#xff0c;改名为 test 2、在modules下的 pom.xml文件中添加子模块 3、进入 test模块修改 pom.xml 把原有的system 修改成test 4、修改对应的包名、目录名和启动应用程序为test 5、修…

在vue中使用echarts

在Vue中使用ECharts可以按照以下步骤进行&#xff1a; 1. 安装ECharts&#xff1a; 在Vue项目的根目录下&#xff0c;执行以下命令安装ECharts依赖&#xff1a; npm install echarts --save2.引入echart import * as echarts from echarts引入echart有全部引入和按需引入&a…

unity修改单个3D物体的重力的大小该怎么处理呢?

在Unity中修改单个3D物体的重力大小可以通过以下步骤实现&#xff1a; 创建一个新的C#脚本来控制重力&#xff1a; 首先&#xff0c;创建一个新的C#脚本&#xff08;例如&#xff1a;GravityModifier.cs&#xff09;并将其附加到需要修改重力的3D物体上。在脚本中&#xff0c…

Vue3 第五节 一些组合式API和其他改变

1.provide和inject 2.响应式数据判断 3.Composition API的优势 4.新的组件 5.其他改变 一.provide和inject 作用&#xff1a;实现祖与后代组件间通信 套路&#xff1a;父组件有一个provide选项来提供数据&#xff0c;后代组件有一个inject选项来开始使用这些数据 &…

【Maven】依赖范围、依赖传递、依赖排除、依赖原则、依赖继承

【Maven】依赖范围、依赖传递、依赖排除、依赖原则、依赖继承 依赖范围 依赖传递 依赖排除 依赖原则 依赖继承 依赖范围 在Maven中&#xff0c;依赖范围&#xff08;Dependency Scope&#xff09;用于控制依赖项在编译、测试和运行时的可见性和可用性。通过指定适当的依赖…

uniapp-原生地图截屏返回base64-进行画板编辑功能

一、场景 vue写uniapp打包安卓包&#xff0c;实现原生地图截屏&#xff08;andirod同事做的&#xff09;-画板编辑功能 实现效果&#xff1a; 二、逻辑步骤简略 1. 由 原生地图nvue部分&#xff0c;回调返回 地图截屏生成的base64 数据&#xff0c; 2. 通过 uni插件市场 im…

【pinia】Pinia入门和基本使用:

文章目录 一、 什么是pinia二、 创建空Vue项目并安装Pinia1. 创建空Vue项目2. 安装Pinia并注册 三、 实现counter四、 实现getters五、 异步action六、 storeToRefs保持响应式解构七、基本使用&#xff1a;【1】main.js【2】store》index.js【3】member.ts 一、 什么是pinia P…

flink如何监听kafka主题配置变更

背景&#xff1a; 从前一篇文章我们知道flink消费kafka主题时是采用的手动assign指定分区的方式&#xff0c;这种消费方式是不处理主题的rebalance操作的&#xff0c;也就是消费者组中即使有消费者退出或者进入也是不会触发消费者所消费的分区的&#xff0c;那么疑问就来了&am…

PHP实现在线进制转换器,10进制,2、4、8、16、32进制转换

1.接口文档 2.laravel实现代码 /*** 进制转换计算器* return \Illuminate\Http\JsonResponse*/public function binaryConvertCal(){$ten $this->request(ten);$two $this->request(two);$four $this->request(four);$eight $this->request(eight);$sixteen …

Redis的AOF持久化

除了RDB持久化功能之外&#xff0c;Redis还提供了AOF持久化功能。与RDB 持久化通过保存数据库中的键值对来记录数据库状态不同&#xff0c;AOF持久化是通过保存Redis服务器所执行的写命令来记录数据库状态的&#xff0c;如下图所示。 举个例子&#xff0c;如果我们对空白的数据…

什么是DNS欺骗及如何进行DNS欺骗

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、什么是 DNS 欺骗&#xff1f;二、开始1.配置2.Ettercap启动3.操作 总结 前言 我已经离开了一段时间&#xff0c;我现在回来了&#xff0c;我终于在做一个教…