【消息利器RabbitMQ】RabbitMQ常用内容浅析

news/2025/3/28 8:28:15/文章来源:https://www.cnblogs.com/sun-10387834/p/18784667

以下是一篇关于 RabbitMQ 的博客内容,涵盖了从基础到死信队列的实现,以及 RabbitMQ 其他常用知识点的补充。内容逻辑清晰,代码完整,适合直接发布。


使用 RabbitMQ 实现消息队列与死信队列:从基础到高级

在现代分布式系统中,消息队列(如 RabbitMQ)是解耦和异步通信的重要工具。本文将基于 Spring Boot 和 RabbitMQ,从基础到高级,逐步实现以下功能:

  1. 发送消息到队列
  2. 发送消息到交换机
  3. 消息可靠性机制
    • 消息确认机制(Publisher Confirms)。
    • 消息持久化(Durable Queues and Messages)。
    • 消费者手动确认(Manual Acknowledgement)。
  4. 死信队列(Dead Letter Queue, DLQ):处理无法被正常消费的消息。

我们将使用一个简单的 User 对象作为消息内容,User 类包含 nameage 字段。


1. 创建 User

首先,在 service-aservice-b 中创建 User 类。

package com.example.common;import java.io.Serializable;public class User implements Serializable {private String name;private int age;// 必须有无参构造函数public User() {}public User(String name, int age) {this.name = name;this.age = age;}// Getter 和 Setterpublic String getName() {return name;}public void setName(String name) {this.name = name;}public int getAge() {return age;}public void setAge(int age) {this.age = age;}@Overridepublic String toString() {return "User{name='" + name + "', age=" + age + "}";}
}

2. 发送消息到队列

2.1 配置队列

service-a 中配置一个队列。

package com.example.servicea.config;import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQQueueConfig {@Beanpublic Queue userQueue() {return new Queue("userQueue", true); // 第二个参数表示持久化}
}

2.2 发送消息

service-a 中发送 User 对象到队列。

package com.example.servicea.service;import com.example.common.User;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class QueueMessageSender {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendUserToQueue(User user) {rabbitTemplate.convertAndSend("userQueue", user);System.out.println("Sent user to queue: " + user);}
}

2.3 接收消息

service-b 中监听队列并接收 User 对象。

package com.example.serviceb.service;import com.example.common.User;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;@Service
public class QueueMessageReceiver {@RabbitListener(queues = "userQueue")public void receiveUserFromQueue(User user) {System.out.println("Received user from queue: " + user);}
}

3. 发送消息到交换机

3.1 配置交换机和队列

service-a 中配置一个 Direct Exchange 并绑定队列。

package com.example.servicea.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQExchangeConfig {@Beanpublic DirectExchange userExchange() {return new DirectExchange("userExchange", true, false); // 第二个参数表示持久化}@Beanpublic Queue userExchangeQueue() {return new Queue("userExchangeQueue", true); // 第二个参数表示持久化}@Beanpublic Binding bindingUserExchangeQueue(DirectExchange userExchange, Queue userExchangeQueue) {return BindingBuilder.bind(userExchangeQueue).to(userExchange).with("user.routing.key");}
}

3.2 发送消息

service-a 中发送 User 对象到交换机。

package com.example.servicea.service;import com.example.common.User;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class ExchangeMessageSender {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendUserToExchange(User user) {rabbitTemplate.convertAndSend("userExchange", "user.routing.key", user);System.out.println("Sent user to exchange: " + user);}
}

3.3 接收消息

service-b 中监听队列并接收 User 对象。

package com.example.serviceb.service;import com.example.common.User;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;@Service
public class ExchangeMessageReceiver {@RabbitListener(queues = "userExchangeQueue")public void receiveUserFromExchange(User user) {System.out.println("Received user from exchange: " + user);}
}

4. 消息可靠性

4.1 消息确认机制(Publisher Confirms)

application.yml 中启用 Publisher Confirms 和 Returns。

spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestpublisher-confirm-type: correlated  # 启用 Publisher Confirmspublisher-returns: true            # 启用 Publisher Returns

service-a 中配置 RabbitTemplate 以支持 Publisher Confirms 和 Returns。

package com.example.servicea.config;import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);// 启用 Publisher Confirms 和 ReturnsrabbitTemplate.setMandatory(true);// 设置确认回调rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {System.out.println("Message confirmed with correlation data: " + correlationData);} else {System.out.println("Message failed with cause: " + cause);}});// 设置返回回调rabbitTemplate.setReturnsCallback(returned -> {System.out.println("Returned message: " + returned.getMessage());System.out.println("Reply code: " + returned.getReplyCode());System.out.println("Reply text: " + returned.getReplyText());System.out.println("Exchange: " + returned.getExchange());System.out.println("Routing key: " + returned.getRoutingKey());});return rabbitTemplate;}
}

4.2 消息持久化

在配置队列和交换机时启用持久化。

@Bean
public Queue userQueue() {return new Queue("userQueue", true); // 第二个参数表示持久化
}@Bean
public DirectExchange userExchange() {return new DirectExchange("userExchange", true, false); // 第二个参数表示持久化
}

在发送消息时设置消息为持久化。

import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import com.fasterxml.jackson.databind.ObjectMapper;@Service
public class ReliableMessageSender {@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate ObjectMapper objectMapper;public void sendUserWithConfirmation(User user) throws IOException {// 生成唯一的 CorrelationDataCorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());// 设置消息属性MessageProperties properties = new MessageProperties();properties.setContentType("application/json"); // 明确设置 content-typeproperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 持久化消息byte[] body = objectMapper.writeValueAsBytes(user);Message message = new Message(body, properties);// 发送消息rabbitTemplate.send("userExchange", "user.routing.key", message, correlationData);System.out.println("Sent user with confirmation: " + user);}
}

4.3 消费者手动确认

service-bapplication.yml 中启用手动确认。

spring:rabbitmq:listener:simple:acknowledge-mode: manual

service-b 中实现手动确认逻辑。

package com.example.serviceb.service;import com.example.common.User;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;import java.io.IOException;@Service
public class ManualAckReceiver {@RabbitListener(queues = "userQueue")public void receiveUser(User user, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {try {System.out.println("Received user from queue: " + user);// 手动确认消息channel.basicAck(tag, false);} catch (Exception e) {// 拒绝消息并重新入队channel.basicNack(tag, false, true);}}
}

5. 死信队列(Dead Letter Queue, DLQ)

5.1 配置死信队列

service-a 中配置死信队列和普通队列。

package com.example.servicea.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQDLXConfig {// 普通交换机@Beanpublic DirectExchange normalExchange() {return new DirectExchange("normalExchange");}// 普通队列,配置死信交换机@Beanpublic Queue normalQueue() {return QueueBuilder.durable("normalQueue").deadLetterExchange("dlxExchange") // 指定死信交换机.deadLetterRoutingKey("dlx.routing.key") // 指定死信路由键.build();}// 绑定普通队列到普通交换机@Beanpublic Binding bindingNormalQueue(DirectExchange normalExchange, Queue normalQueue) {return BindingBuilder.bind(normalQueue).to(normalExchange).with("normal.routing.key");}// 死信交换机@Beanpublic DirectExchange dlxExchange() {return new DirectExchange("dlxExchange");}// 死信队列@Beanpublic Queue dlqQueue() {return new Queue("dlqQueue");}// 绑定死信队列到死信交换机@Beanpublic Binding bindingDlqQueue(DirectExchange dlxExchange, Queue dlqQueue) {return BindingBuilder.bind(dlqQueue).to(dlxExchange).with("dlx.routing.key");}
}

5.2 发送消息到普通队列

service-a 中发送消息到普通队列。

package com.example.servicea.service;import com.example.common.User;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class NormalMessageSender {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendUserToNormalQueue(User user) {rabbitTemplate.convertAndSend("normalExchange", "normal.routing.key", user);System.out.println("Sent user to normal queue: " + user);}
}

5.3 消费普通队列的消息

service-b 中消费普通队列的消息,并模拟消息处理失败。

package com.example.serviceb.service;import com.example.common.User;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;import java.io.IOException;@Service
public class NormalMessageReceiver {@RabbitListener(queues = "normalQueue")public void receiveUserFromNormalQueue(User user, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {try {System.out.println("Received user from normal queue: " + user);if (user.getName().equals("Bob")) {throw new RuntimeException("Simulated processing failure");}// 手动确认消息channel.basicAck(tag, false);} catch (Exception e) {// 拒绝消息并重新入队channel.basicNack(tag, false, false); // 不重新入队,消息会被路由到死信队列System.out.println("Message rejected and sent to DLQ: " + user);}}
}

5.4 消费死信队列的消息

service-b 中消费死信队列的消息。

package com.example.serviceb.service;import com.example.common.User;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;@Service
public class DLQMessageReceiver {@RabbitListener(queues = "dlqQueue")public void receiveUserFromDLQ(User user) {System.out.println("Received user from DLQ: " + user);}
}

6. 测试死信队列

6.1 发送消息

service-a 中发送消息到普通队列:

normalMessageSender.sendUserToNormalQueue(new User("Alice", 25));
normalMessageSender.sendUserToNormalQueue(new User("Bob", 30));

6.2 观察日志

  • 正常消息(Alice)会被消费并确认:
    Received user from normal queue: User{name='Alice', age=25}
    
  • 失败消息(Bob)会被拒绝并路由到死信队列:
    Received user from normal queue: User{name='Bob', age=30}
    Message rejected and sent to DLQ: User{name='Bob', age=30}
    Received user from DLQ: User{name='Bob', age=30}
    

7. 总结

通过以上步骤,我们实现了 RabbitMQ 的死信队列功能:

  1. 普通队列:绑定到普通交换机,配置了死信交换机和路由键。
  2. 死信队列:绑定到死信交换机,用于存储无法被正常消费的消息。
  3. 消息处理
    • 正常消息被消费并确认。
    • 失败消息被拒绝并路由到死信队列。
  4. 死信队列消费:单独消费死信队列中的消息。

这种机制非常适合处理异常情况下的消息,确保系统的可靠性和可维护性。


8. 其他常用知识点

8.1 消息过期(TTL)

可以为队列或消息设置过期时间(Time-To-Live, TTL)。过期后的消息会被路由到死信队列。

设置队列 TTL:

@Bean
public Queue normalQueue() {return QueueBuilder.durable("normalQueue").deadLetterExchange("dlxExchange").deadLetterRoutingKey("dlx.routing.key").ttl(60000) // 设置队列中消息的 TTL 为 60 秒.build();
}

设置消息 TTL:

MessageProperties properties = new MessageProperties();
properties.setExpiration("60000"); // 设置消息的 TTL 为 60 秒
Message message = new Message(body, properties);
rabbitTemplate.send("normalExchange", "normal.routing.key", message);

8.2 优先级队列

可以为队列设置优先级,优先级高的消息会被优先消费。

设置优先级队列:

@Bean
public Queue priorityQueue() {return QueueBuilder.durable("priorityQueue").maxPriority(10) // 设置最大优先级为 10.build();
}

发送优先级消息:

MessageProperties properties = new MessageProperties();
properties.setPriority(5); // 设置消息优先级为 5
Message message = new Message(body, properties);
rabbitTemplate.send("priorityExchange", "priority.routing.key", message);

希望这篇博客对你有所帮助!如果有任何问题或建议,欢迎在评论区留言。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/902353.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Hook(钩子技术)

一.介绍 1.当代码执行到某行时,获取寄存器值和内存里的值,进行调试分析,例如hook明文包. 2.当代码执行到某行时,插入想执行的代码.例如迅雷拦截发包函数. 3.当代码执行到某行时,修改寄存器,达到某些篡改目的. 拿FishingKit这道题举例就是本来运行流程是: a--->b 而使用了H…

2024年1月Java项目开发指南20:windows下使用Nignx部署应用

命令 启动start nginx重新加载配置文件 nginx -s reload nginx -t修改配置后执行上面两个语句下载地址 https://nginx.org/en/download.html 部署Vue项目 1 打包Vue项目 得到dist文件件 2 启动Nginx,访问localhost 注意:Nginx需要占用80端口。 启动后访问localhost,成功后如…

Z3-solve 求解器(SMT求解器)解方程:

Int(name, ctx=None),创建一个整数变量,name是名字 Ints (names, ctx=None),创建多个整数变量,names是空格分隔名字 IntVal (val, ctx=None),创建一个整数常量,有初始值,没名字。 对于实数类型的API与整数类型一致,向量(BitVec)则稍有区别: Bitvec(name,bv,ctx=None),…

CSS 如何设置父元素的透明度而不影响子元素的透明度

CSS 如何设置父元素的透明度而不影响子元素的透明度CSS 如何设置父元素的透明度而不影响子元素的透明度 在 CSS 中,设置父元素的透明度(如通过 opacity 属性)会影响所有子元素的透明度,因为 opacity 是作用于整个元素及其内容的。如果想让父元素透明但不影响子元素的透明度…

ASE20N40-ASEMI工业电源专用ASE20N40

ASE20N40-ASEMI工业电源专用ASE20N40编辑:LL ASE20N40-ASEMI工业电源专用ASE20N40 型号:ASE20N40 品牌:ASEMI 封装:TO-220 最大漏源电流:20A 漏源击穿电压:400V 批号:最新 RDS(ON)Max:216mΩ 引脚数量:3 沟道类型:N沟道MOS管 封装尺寸:如图 特性:MOS管、N沟道MO…

GreatSQL 为何选择全表扫描而不选索引

GreatSQL 为何选择全表扫描而不选索引 1. 问题背景 在生产环境中,发现某些查询即使有索引,也没有使用索引,反而选择了全表扫描。这种现象的根本原因在于优化器评估索引扫描的成本时,认为使用索引的成本高于全表扫描。 2. 场景复现 2.1 环境信息机器 IP:192.168.137.120 Gr…

Profibus DP转EtherCAT实例展示欧姆龙PLC对接西门子变频器操作

一. 案例背景 在一个小型工厂,现场设备需求是Profibus DP转EtherCAT,两端设备分别是西门子变频器和欧姆龙PLC通讯,。为提高现场的工作效率,采纳捷米特JM-DPM-ECT网关模块来实现数据的互联互通。二.设备介绍 1.欧姆龙PLC 欧姆龙PLC是一种功能完善的紧凑型PLC,能为业界领先的…

bayaim-如何保证Redis中的数据都是热点数据?

——————————————————————————————————————————————————— ---- bayaim,申明:本文摘自:https://mp.weixin.qq.com/s?__biz=MzAwNDUxOTQ5MQ==&mid=2247623691&idx=1&sn=35e1b6e9206458f9fcd99e48bebccc13&…

translator

import streamlit as st import time import base64 from streamlit.components.v1 import html# 自定义CSS样式 def set_custom_style():st.markdown(""" <style>/* 页面背景:浅色渐变,提高可读性 */.main {background: linear-gradient(135deg, #E0F7…

AI+CRM纷享汇湖南株洲站圆满落幕

近日,由株洲工业和信息化局指导,湘数促会株洲联络处主办,华为云、纷享销客共同承办的“智变未来营销破局”暨企业CRM应用与发展趋势纷享汇在湖南株洲成功举办。此次活动吸引了近40位湘企高管参与,共同探讨企业数字化转型与营销破局的新路径。 一、华为云助力企业数字化转型…

postgresql 16版本之后使用yum方式下载

1.登录下载地址https://www.postgresql.org/download/linux/redhat/

bilibili 分段进度跳转

编辑好后重新投稿即可