RabbitMQ 死信队列应用

1. 概念

死信队列(Dead Letter Queue)是在消息队列系统中的一种特殊队列,用于存储无法被消费的消息。消息可能会因为多种原因变成“死信”,例如消息过期、消息被拒绝、消息队列长度超过限制等。当消息变成“死信”时,它们会被路由到死信队列中,以便进行进一步处理或分析。 死信队列能够帮助系统进行消息跟踪、监控和处理异常情况,是消息队列系统中的重要组成部分。

2. 应用场景

死信队列在消息队列系统中有多种应用场景,包括但不限于以下几个方面:

  • 延迟消息处理:实现延迟消息投递,例如实现消息的定时投递、消息重试机制等。

  • 任务调度:用于实现任务调度系统,例如延迟执行任务、失败重试任务等。

  • 异常处理:处理消息消费失败或超时的情况,对异常消息进行统一处理。

  • 业务流程控制:实现业务流程中的状态控制和超时处理,例如订单超时取消、支付超时处理等。

  • 监控和统计:对异常消息进行统计和分析,用于系统性能监控和问题排查。

这些应用场景展示了死信队列的灵活性和实用性,在实际系统开发中具有广泛的应用价值。

3. 造成消息进入死信队列的原因

消息成为死信的原因有以下几种:

  • 消息被拒绝(basic.reject或basic.nack),并且requeue标志被设置为false。若参数requeue为true,则表示还可以将此跳消息重新塞回普通队列,若为false则消息被拒绝后直接进入死信队列。

  • 消息过期。在生产者设置生产时设置,若消费者未在过期时间内消费消息,则消息被转发到死信队列中。("x-message-ttl")

  • 队列达到最大长度。当普通队列中消息堆积数量长度达到了maxLength,则会将新接收的消息转发到死信队列中去,从而避免消息丢失。

4. 死信队列工作流程图

5. 代码示例

5.1 引入依赖

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId><version>2.7.15</version>
</dependency>

5.2 RabbitMQ配置

@Configuration
public class RabbitConfig {/*** 死信队列消息模型构建----------------------------------------------------------------------------------**/// 创建普通队列@Beanpublic Queue basicQueue() {Map<String, Object> params = new HashMap<>(8);// x-dead-letter-exchange 声明了队列里的死信转发到的DLX名称,params.put("x-dead-letter-exchange", Exchange.DEMO_DEAD_LETTER_EXCHANGE);// x-dead-letter-routing-key 声明了这些死信在转发时携带的 routing-key 名称。params.put("x-dead-letter-routing-key", RoutingKey.DEMO_DEAD_ROUTING_KEY);// 注意这里是毫秒单位,这里我们给10秒params.put("x-message-ttl", 10*1000);return new Queue(MyQueue.DEMO_CONSUMER_QUEUE, true, false, false, params);}//创建“基本消息模型”的基本交换机,面向生产者@Beanpublic TopicExchange basicExchange() {//创建并返回基本交换机实例return new TopicExchange(Exchange.DEMO_BASIC_NORMAL_EXCHANGE, true, false);}//创建“基本消息模型”的基本绑定(基本交换机+基本路由),面向生产者@Beanpublic Binding basicBinding() {//创建并返回基本消息模型中的基本绑定(注意这里是正常交换机跟死信队列绑定在一定,不叫死信路由)return BindingBuilder.bind(basicQueue()).to(basicExchange()).with(RoutingKey.DEMO_ROUTING_KEY);}// 创建死信交换机@Beanpublic TopicExchange deadLetterExchange() {//创建并返回死信交换机实例return new TopicExchange(Exchange.DEMO_DEAD_LETTER_EXCHANGE, true, false);}// 创建第二个中转站// 创建死信队列@Beanpublic Queue deadLetterQueue() {return new Queue(MyQueue.DEMO_DEAD_LETTER_QUEUE, true);}// 创建死信路由及其绑定@Beanpublic Binding deadLetterBinding() {return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(RoutingKey.DEMO_DEAD_ROUTING_KEY);}public static class Exchange {public static final String DEMO_BASIC_NORMAL_EXCHANGE = "demo.basic.exchange";public static final String DEMO_DEAD_LETTER_EXCHANGE = "demo.dead.letter.exchange";}public static class RoutingKey {//交换机与报表队列绑定的RoutingKeypublic static final String DEMO_ROUTING_KEY = "demo.basic.routing.key";public static final String DEMO_DEAD_ROUTING_KEY = "demo.dead.routing.key";}/*** 队列名称* @author peng.zhang* @date 2024/01/30*/public static class MyQueue {//报表队列名称public static final String DEMO_CONSUMER_QUEUE = "demo.basic.queue";//死信队列名称public static final String DEMO_DEAD_LETTER_QUEUE = "demo.dead.letter.queue";}
}

5.3 消息生产者

@RestController
@RequestMapping("/test")
@Slf4j
public class TestController {@Resourceprivate RabbitTemplate rabbitTemplate;/*** 发送消息到死信队列*/@PostMapping("/testDeadQueue")public String testDeadQueue() {// 设置生产者到交换机的确认回调rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {log.info("correlationData:{}, ack:{}, cause:{}", JSON.toJSONString(correlationData), ack, cause);});// 设置消息未被队列接收时的返回回调rabbitTemplate.setReturnCallback((message, replyCode, replyText, ex, routing) -> {log.info("message:{}, replyCode:{}, replyText:{}, exchange:{}, routingKey:{}", JSON.toJSONString(message),replyCode, replyText, ex, routing);});// 生成关联数据并发送消息到交换机CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());// 消息内容String messageBody = StrUtil.format("this message send at {}", DateUtil.format(LocalDateTime.now(), "yyyy-MM-dd HH:mm:ss"));rabbitTemplate.convertAndSend(RabbitConfig.Exchange.DEMO_BASIC_NORMAL_EXCHANGE, RabbitConfig.RoutingKey.DEMO_ROUTING_KEY, messageBody, correlationData);log.info(">>>>>{}, 发送消息:{}", DateUtil.format(LocalDateTime.now(), "yyyy-MM-dd HH:mm:ss"), messageBody);return "OK";}}

5.4 消息消费者

@Component
@Slf4j
public class DeadLetterConsumer {/*** 监听 DEMO_CONSUMER_QUEUE 并处理传入的消息。* 为测试目的抛出 IOException 以模拟异常。** @param messageBody 消息负载* @param headers     消息头* @param channel     用于消息确认的通道* @throws IOException 如果抛出异常*/@RabbitListener(queues = RabbitConfig.MyQueue.DEMO_CONSUMER_QUEUE)@RabbitHandlerpublic void testBasicQueueAndThrowsException(@Payload String messageBody, @Headers Map<String, Object> headers, Channel channel) throws IOException {/*** Delivery Tag 用来标识信道中投递的消息。RabbitMQ 推送消息给 Consumer 时,会附带一个 Delivery Tag,* 以便 Consumer 可以在消息确认时告诉 RabbitMQ 到底是哪条消息被确认了。* RabbitMQ 保证在每个信道中,每条消息的 Delivery Tag 从 1 开始递增。*/Long tag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);log.info(">>>>>{} 普通队列消费, tag = {}, 消息内容:{}", DateUtil.format(LocalDateTime.now(), "yyyy-MM-dd HH:mm:ss"), tag, messageBody);/***  multiple 取值为 false 时,表示通知 RabbitMQ 当前消息被确认*  如果为 true,则额外将比第一个参数指定的 delivery tag 小的消息一并确认*/// ACK,确认一条消息已经被消费
//        channel.basicAck(deliveryTag, false);// 对应的业务操作。。。。。// doBusiness();// 模拟消息拒绝channel.basicNack(tag, false, false);}/*** 处理业务逻辑*/private void doBusiness() {System.out.println("here do some business code");}/*** 监听死信队列并处理消息。** @param data    消息内容* @param tag     消息标签* @param channel 通道*/@RabbitListener(queues = RabbitConfig.MyQueue.DEMO_DEAD_LETTER_QUEUE)@RabbitHandlerpublic void fromDeadLetter(@Payload String data, @Header(AmqpHeaders.DELIVERY_TAG) long tag, Channel channel) {log.info(">>>>>{} 死信队列消费, tag = {}, 消息内容:{}", DateUtil.format(LocalDateTime.now(), "yyyy-MM-dd HH:mm:ss"), tag, data);// 对应的业务操作。。。。。}
}

5.5 YML配置

spring:rabbitmq:username: rabbitmqpassword: rabbitmqport: 5672host: 127.0.0.1#publisher-confirm-type参数有三个可选值:#SIMPLE:会触发回调方法,相当于单个确认(发一条确认一条)。#CORRELATED:消息从生产者发送到交换机后触发回调方法。#NONE(默认):关闭发布确认模式。publisher-confirm-type: correlatedtemplate:receive-timeout: 1800000reply-timeout: 1800000retry:enabled: falselistener:direct:retry:enabled: truedefault-requeue-rejected: falsesimple:retry:# 是否开启消费者重试(为false时关闭消费者重试,这时消费端代码异常会一直重复收到消息)enabled: true# 最大重试次数max-attempts: 1# 重试间隔时间(单位毫秒)initial-interval: 10000# 重试最大时间间隔(单位毫秒)max-interval: 300000# 应用于前一重试间隔的乘法器multiplier: 5default-requeue-rejected: false

5.6 控制台输出

从控制台可以看出,消息被拒绝后,大概10秒后死信队列消息被消费。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/444761.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

《高性能MySQL》

文章目录 一、创建1. 磁盘1.1 页、扇区、寻道、寻址、硬盘性能 2. 行结构row_format2.1 Compact紧凑2.1.1 行溢出2.1.2 作用2.1.3 内容1-额外信息1、变长字段长度2、NULL值列表3、记录头信息 2.1.4 内容2-真实数据4、表中列的值5、transaction_id6、roll_point7、row_id 2.2 dy…

MD5算法:高效安全的数据完整性保障

摘要&#xff1a;在数字世界中&#xff0c;确保数据完整性和安全性至关重要。消息摘要算法就是一种用于实现这一目标的常用技术。其中&#xff0c;Message Digest Algorithm 5&#xff08;MD5&#xff09;算法因其高效性和安全性而受到广泛关注。本文将详细介绍MD5算法的优缺点…

ElementUI 组件:Container 布局容器实例

ElementUI安装与使用指南 Container 布局容器 点击下载learnelementuispringboot项目源码 效果图 el-container-example.vue&#xff08;Container 布局容器实例&#xff09;页面效果图 项目里el-container-example.vue代码 <script> export default {name: el_cont…

<网络安全>《12 数据库安全审计系统》

1 概念 数据库安全审计系统通过对用户访问数据库行为的记录、分析和汇报&#xff0c;来帮助用户事后生成合规报告、事故追根溯源&#xff0c;同时通过大数据搜索技术提供高效查询审计报告&#xff0c;定位事件原因&#xff0c;以便日后查询、分析、过滤&#xff0c;实现加强内…

SpringBoot使用Rabbit详解含完整代码

1. 摘要 本文将详细介绍如何在Spring Boot应用程序中集成和使用RabbitMQ消息队列。RabbitMQ是一个开源的消息代理和队列服务器&#xff0c;用于通过轻量级和可靠的消息在应用程序或系统之间进行异步通信。本文将通过步骤说明、代码示例和详细注释&#xff0c;指导读者在Spring…

代码重构的招式

背景介绍 最近在团队工作中花了不少心思主导建设了测试平台&#xff0c;前期的建设思路是能用就行&#xff0c;随着建设的深入&#xff0c;逐渐需要学习下代码架构设计方面的内容了。于是参加了公司组织的代码重构与模式的培训&#xff0c;通过培训&#xff0c;感觉收获颇丰&a…

往年国自然项目信息查看

1 国自然申报系统 进去可以看到摘要。 2 letpub

(1)从 AGP 4.1.2 升级到 7.5.1 我遇到了什么问题

AGP 升级问题 &#xff08;1&#xff09;Could not get unknown property ‘project’ for settings&#xff0c;on project.buildscript 问题 Could not get unknown property ‘project’ for settings ‘AGP1’ of type org.gradle.initialization.DefaultSettings. agp4 …

备战蓝桥杯---数据结构与STL应用(入门4)

本专题主要是关于利用优先队列解决贪心选择上的“反悔”问题 话不多说&#xff0c;直接看题&#xff1a; 下面为分析&#xff1a; 很显然&#xff0c;我们在整体上以s[i]为基准&#xff0c;先把士兵按s[i]排好。然后&#xff0c;我们先求s[i]大的开始&#xff0c;即规定选人数…

牛客——字符串(尺取法与滑动窗口)

链接&#xff1a;登录—专业IT笔试面试备考平台_牛客网 来源&#xff1a;牛客网 题目描述 小N现在有一个字符串S。他把这这个字符串的所有子串都挑了出来。一个S的子串T是合法的&#xff0c;当且仅当T中包含了所有的小写字母。小N希望知道所有的合法的S的子串中&#xff0c…

微信小程序(二十三)获取页面栈及当前页面实例

注释很详细&#xff0c;直接上代码 上一篇 新增内容&#xff1a; 1.页面栈的定义 2.获取当前页面实例 页面栈 当我们从A页面跳到B页面再跳到C页面时&#xff0c;页面栈则是由三个页面的实例组成的数组&#xff0c;A在下标为0的数组中&#xff0c;C在下标为2的数组中 当然&#…

基于二值化图像转GCode的螺旋扫描实现

基于二值化图像转GCode的螺旋扫描实现 什么是双向扫描螺旋扫描代码示例 基于二值化图像转GCode的螺旋扫描实现 什么是螺旋扫描 螺旋扫描&#xff08;Spiral Scanning&#xff09;是激光雕刻中一种特殊的扫描方式&#xff0c;其特点是激光头按照螺旋形状逐渐向外移动&#xf…