RabbitMQ消息的应答

消息的应答机制

消费者完成一个任务可能需要一段时间,如果其中一个消费者处理一个长的任务并仅只完成了部分突然它挂掉了,会发生什么情况。RabbitMQ 一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息。以及后续发送给该消费者的消息,因为它无法接收到。

为了保证消息在发送过程中不丢失,RabbitMQ 引入消息应答机制,消息应答就是:消费者在接收到消息并且处理该消息之后,告诉 RabbitMQ 它已经处理了, RabbitMQ 可以把该消息删除了。

自动应答

消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权衡,因为这种模式如果消息在接收到之前,消费者那边出现连接或者 channel 关闭,那么消息就丢失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,当然这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率能够处理这些消息的情况下使用。

手动应答

手动应答的好处是可以批量应答并且减少网络拥堵,批量应答的批量范围是channel 上未应答的消息。比如说 channel 上有传送 tag 的消息5,6,7,8 当前 tag 是8 那么此时5-8 的这些还未应答的消息都会被确认收到消息应答。但是实际上还是不建议开启批量应答的。

image-20220530100237882

Channel.basicAck(用于肯定确认)

RabbitMQ 已知道该消息并且成功的处理消息,可以将其丢弃了。

Channel.basicNack(用于否定确认)

不处理该消息了直接拒绝,可以将其丢弃了。

Channel.basicReject(用于否定确认)

与 Channel.basicNack 相比少一个参数。

不处理该消息了直接拒绝,可以将其丢弃了。

消息自动重新入队

如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其重新排队。如果此时其他消费者可以处理,它将很快将其重新分发给另一个消费者。这样,即使某个消费者偶尔死亡,也可以确保不会丢失任何消息。

代码实现

生产者
public class MyProducer {@Testpublic void test() throws Exception {// 队列名称String queue = "xw_queue";String message = "Hello World -> ";// 创建工厂ConnectionFactory factory = new ConnectionFactory();factory.setVirtualHost("/");factory.setHost("xuewei.world");factory.setUsername("xuewei");factory.setPassword("123456");factory.setPort(5672);// 创建连接和通道Connection connection = factory.newConnection();Channel channel = connection.createChannel();for (int i = 0; i < 20; i++) {// 发布消息channel.basicPublish("xw_exchange", queue, null, (message + i).getBytes());}}
}
消费者1

开启手动确认后,消费者1如果在处理消息的回调中不确认消息,那么队列中的消息会处于unacked的状态,如果消费者1突然挂掉,那么这些未确认的消费会重新发送给其他消费者。

public class MyConsumer1 {public static void main(String[] args) throws Exception {// 队列名称String queue = "xw_queue";// 创建工厂ConnectionFactory factory = new ConnectionFactory();factory.setVirtualHost("/");factory.setHost("xuewei.world");factory.setUsername("xuewei");factory.setPassword("123456");factory.setPort(5672);// 创建连接和通道Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(queue, true, false, false, null);channel.queueBind("", "xw_exchange", queue);// 配置开启手动应答channel.basicConsume(queue, false, new DefaultConsumer(channel) {@SneakyThrows@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 消费完成后手动应答// channel.basicAck(envelope.getDeliveryTag(), false);Thread.sleep(5000);System.out.println("消费者1:接收到消息: " + new String(body));}});}
}
消费者2

消费者2正常消费消息,收到消息后立刻确认。

public class MyConsumer2 {public static void main(String[] args) throws Exception {// 队列名称String queue = "xw_queue";// 创建工厂ConnectionFactory factory = new ConnectionFactory();factory.setVirtualHost("/");factory.setHost("xuewei.world");factory.setUsername("xuewei");factory.setPassword("123456");factory.setPort(5672);// 创建连接和通道Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(queue, true, false, false, null);channel.queueBind("", "xw_exchange", queue);// 配置开启手动应答channel.basicConsume(queue, false, new DefaultConsumer(channel) {@SneakyThrows@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {// 消费完成后手动应答channel.basicAck(envelope.getDeliveryTag(), false);System.out.println("消费者2:接收到消息: " + new String(body));}});}
}

效果展示

image-20220530102415347

image-20220530102445700

此时把消费者1停掉,那么上面这10条为确认的消费会重新入队,发送给另外的消费者。

image-20220530102642150

image-20220530102708467

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/230890.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

机器学习:领域自适应学习

训练一个分类器是小问题 上难度 训练数据和测试数据不一致&#xff0c;比如训练数据是黑白的&#xff0c;测试时彩色的&#xff0c;结果准确率非常低。 训练数据和测试数据有点差距的时候&#xff0c;能不能效果也能好呢&#xff1f;这就用到了领域自使用domain adptation 用一…

一款LED段码显示屏驱动芯片方案

一、基本概述 TM1620是一种LED&#xff08;发光二极管显示器&#xff09;驱动控制专用IC,内部集成有MCU数字接口、数据锁存器、LED驱动等电路。本产品质量可靠、稳定性好、抗干扰能力强。 二、基本特性 采用CMOS工艺 显示模式&#xff08;8段6位&#xff5e;10段4位&#xff…

【人工智能Ⅰ】实验2:遗传算法

实验2 遗传算法实验 一、实验目的 熟悉和掌握遗传算法的原理、流程和编码策略&#xff0c;理解求解TSP问题的流程并测试主要参数对结果的影响&#xff0c;掌握遗传算法的基本实现方法。 二、实验原理 旅行商问题&#xff0c;即TSP问题&#xff08;Traveling Salesman Proble…

Spark on yarn 模式的安装与部署

任务描述 本关任务&#xff1a; Spark on YARN 模式的安装与部署。 相关知识 为了完成本关任务&#xff0c;你需要掌握&#xff1a; Spark 部署模式的种类&#xff1b;Spark on YARN 模式的安装。 Spark 部署模式 Spark 部署模式主要分为以下几种&#xff0c;Spark Stand…

Linux高级IO

文章目录 一.IO的基本概念二.钓鱼五人组三.五种IO模型四.高级IO重要概念1.同步通信 VS 异步通信2.阻塞 VS 非阻塞 五.其他高级IO六.阻塞IO七.非阻塞IO 一.IO的基本概念 什么是IO&#xff1f; I/O&#xff08;input/output&#xff09;也就是输入和输出&#xff0c;在著名的冯诺…

MySQL 中的锁(三)

8.7. 死锁和空间锁 一般来说&#xff0c;只要有并发和加锁这两种情况的共同加持下&#xff0c;都会有死锁的身影。 死锁的具体成因&#xff0c;借用我们在并发编程中的内容&#xff1a; 8.7.1. 死锁 8.7.1.1. 概念 是指两个或两个以上的进程在执行过程中&#xff0c;由于竞…

LeetCode(42)有效的字母异位词【哈希表】【简单】

目录 1.题目2.答案3.提交结果截图 链接&#xff1a; 有效的字母异位词 1.题目 给定两个字符串 *s* 和 *t* &#xff0c;编写一个函数来判断 *t* 是否是 *s* 的字母异位词。 **注意&#xff1a;**若 *s* 和 *t* 中每个字符出现的次数都相同&#xff0c;则称 *s* 和 *t* 互为字…

Matlab下载许可证文件 教程(在账号有许可证的前提下)

文章目录 Part.I IntroductionPart.II 许可证文件过期解决方案Chap.I 使用 Internet 自动激活Chap.II 在不使用 Internet 的情况下手动激活 Part.I Introduction 本文主要介绍&#xff0c;在 Mathwork 账号有许可证的前提下&#xff0c;下载许可证的操作流程。 好久没有用 Mat…

Linux小程序之进度条

> 作者简介&#xff1a;დ旧言~&#xff0c;目前大二&#xff0c;现在学习Java&#xff0c;c&#xff0c;c&#xff0c;Python等 > 座右铭&#xff1a;松树千年终是朽&#xff0c;槿花一日自为荣。 > 目标&#xff1a;自己能实现进度条 > 毒鸡汤&#xff1a; > …

【hacker送书第5期】SQL Server从入门到精通(第5版)

第5期图书推荐 内容简介作者简介图书目录参与方式 内容简介 SQL Server从入门到精通&#xff08;第5版&#xff09;》从初学者角度出发&#xff0c;通过通俗易懂的语言、丰富多彩的实例&#xff0c;详细介绍了SQL Server开发所必需的各方面技术。全书分为4篇共19章&#xff0c;…

觉得可视化地图太难做?那你是没用过它!

后台一直有粉丝私信老李&#xff0c;问到现在各大企业对数据可视化越来越看重&#xff0c;但是感觉那些高大上的图表做起来一定很复杂甚至可能还需要一些编程基础&#xff0c;希望老李可以推荐一些简单好上手的数据可视化工具。   作为一名数据分析爱好者&#xff0c;我也尝试…

DDoS高防IP到底是什么?

DDoS高防IP是提供一个带防御的IP&#xff0c;主要是针对网络中的DDoS攻击进行保护&#xff0c;是针对互联网服务器遭受大流量的DDoS攻击后&#xff0c;导致服务不可用的情况下&#xff0c;用户可以通过配置高防IP&#xff0c;将攻击流量引流到高防IP上&#xff0c;从而确保源站…