以下是一篇关于 RabbitMQ 的博客内容,涵盖了从基础到死信队列的实现,以及 RabbitMQ 其他常用知识点的补充。内容逻辑清晰,代码完整,适合直接发布。
使用 RabbitMQ 实现消息队列与死信队列:从基础到高级
在现代分布式系统中,消息队列(如 RabbitMQ)是解耦和异步通信的重要工具。本文将基于 Spring Boot 和 RabbitMQ,从基础到高级,逐步实现以下功能:
- 发送消息到队列。
- 发送消息到交换机。
- 消息可靠性机制:
- 消息确认机制(Publisher Confirms)。
- 消息持久化(Durable Queues and Messages)。
- 消费者手动确认(Manual Acknowledgement)。
- 死信队列(Dead Letter Queue, DLQ):处理无法被正常消费的消息。
我们将使用一个简单的 User
对象作为消息内容,User
类包含 name
和 age
字段。
1. 创建 User
类
首先,在 service-a
和 service-b
中创建 User
类。
package com.example.common;import java.io.Serializable;public class User implements Serializable {private String name;private int age;// 必须有无参构造函数public User() {}public User(String name, int age) {this.name = name;this.age = age;}// Getter 和 Setterpublic String getName() {return name;}public void setName(String name) {this.name = name;}public int getAge() {return age;}public void setAge(int age) {this.age = age;}@Overridepublic String toString() {return "User{name='" + name + "', age=" + age + "}";}
}
2. 发送消息到队列
2.1 配置队列
在 service-a
中配置一个队列。
package com.example.servicea.config;import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQQueueConfig {@Beanpublic Queue userQueue() {return new Queue("userQueue", true); // 第二个参数表示持久化}
}
2.2 发送消息
在 service-a
中发送 User
对象到队列。
package com.example.servicea.service;import com.example.common.User;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class QueueMessageSender {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendUserToQueue(User user) {rabbitTemplate.convertAndSend("userQueue", user);System.out.println("Sent user to queue: " + user);}
}
2.3 接收消息
在 service-b
中监听队列并接收 User
对象。
package com.example.serviceb.service;import com.example.common.User;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;@Service
public class QueueMessageReceiver {@RabbitListener(queues = "userQueue")public void receiveUserFromQueue(User user) {System.out.println("Received user from queue: " + user);}
}
3. 发送消息到交换机
3.1 配置交换机和队列
在 service-a
中配置一个 Direct Exchange 并绑定队列。
package com.example.servicea.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQExchangeConfig {@Beanpublic DirectExchange userExchange() {return new DirectExchange("userExchange", true, false); // 第二个参数表示持久化}@Beanpublic Queue userExchangeQueue() {return new Queue("userExchangeQueue", true); // 第二个参数表示持久化}@Beanpublic Binding bindingUserExchangeQueue(DirectExchange userExchange, Queue userExchangeQueue) {return BindingBuilder.bind(userExchangeQueue).to(userExchange).with("user.routing.key");}
}
3.2 发送消息
在 service-a
中发送 User
对象到交换机。
package com.example.servicea.service;import com.example.common.User;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class ExchangeMessageSender {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendUserToExchange(User user) {rabbitTemplate.convertAndSend("userExchange", "user.routing.key", user);System.out.println("Sent user to exchange: " + user);}
}
3.3 接收消息
在 service-b
中监听队列并接收 User
对象。
package com.example.serviceb.service;import com.example.common.User;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;@Service
public class ExchangeMessageReceiver {@RabbitListener(queues = "userExchangeQueue")public void receiveUserFromExchange(User user) {System.out.println("Received user from exchange: " + user);}
}
4. 消息可靠性
4.1 消息确认机制(Publisher Confirms)
在 application.yml
中启用 Publisher Confirms 和 Returns。
spring:rabbitmq:host: localhostport: 5672username: guestpassword: guestpublisher-confirm-type: correlated # 启用 Publisher Confirmspublisher-returns: true # 启用 Publisher Returns
在 service-a
中配置 RabbitTemplate
以支持 Publisher Confirms 和 Returns。
package com.example.servicea.config;import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQConfig {@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);// 启用 Publisher Confirms 和 ReturnsrabbitTemplate.setMandatory(true);// 设置确认回调rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {if (ack) {System.out.println("Message confirmed with correlation data: " + correlationData);} else {System.out.println("Message failed with cause: " + cause);}});// 设置返回回调rabbitTemplate.setReturnsCallback(returned -> {System.out.println("Returned message: " + returned.getMessage());System.out.println("Reply code: " + returned.getReplyCode());System.out.println("Reply text: " + returned.getReplyText());System.out.println("Exchange: " + returned.getExchange());System.out.println("Routing key: " + returned.getRoutingKey());});return rabbitTemplate;}
}
4.2 消息持久化
在配置队列和交换机时启用持久化。
@Bean
public Queue userQueue() {return new Queue("userQueue", true); // 第二个参数表示持久化
}@Bean
public DirectExchange userExchange() {return new DirectExchange("userExchange", true, false); // 第二个参数表示持久化
}
在发送消息时设置消息为持久化。
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessageProperties;
import com.fasterxml.jackson.databind.ObjectMapper;@Service
public class ReliableMessageSender {@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate ObjectMapper objectMapper;public void sendUserWithConfirmation(User user) throws IOException {// 生成唯一的 CorrelationDataCorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());// 设置消息属性MessageProperties properties = new MessageProperties();properties.setContentType("application/json"); // 明确设置 content-typeproperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT); // 持久化消息byte[] body = objectMapper.writeValueAsBytes(user);Message message = new Message(body, properties);// 发送消息rabbitTemplate.send("userExchange", "user.routing.key", message, correlationData);System.out.println("Sent user with confirmation: " + user);}
}
4.3 消费者手动确认
在 service-b
的 application.yml
中启用手动确认。
spring:rabbitmq:listener:simple:acknowledge-mode: manual
在 service-b
中实现手动确认逻辑。
package com.example.serviceb.service;import com.example.common.User;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;import java.io.IOException;@Service
public class ManualAckReceiver {@RabbitListener(queues = "userQueue")public void receiveUser(User user, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {try {System.out.println("Received user from queue: " + user);// 手动确认消息channel.basicAck(tag, false);} catch (Exception e) {// 拒绝消息并重新入队channel.basicNack(tag, false, true);}}
}
5. 死信队列(Dead Letter Queue, DLQ)
5.1 配置死信队列
在 service-a
中配置死信队列和普通队列。
package com.example.servicea.config;import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class RabbitMQDLXConfig {// 普通交换机@Beanpublic DirectExchange normalExchange() {return new DirectExchange("normalExchange");}// 普通队列,配置死信交换机@Beanpublic Queue normalQueue() {return QueueBuilder.durable("normalQueue").deadLetterExchange("dlxExchange") // 指定死信交换机.deadLetterRoutingKey("dlx.routing.key") // 指定死信路由键.build();}// 绑定普通队列到普通交换机@Beanpublic Binding bindingNormalQueue(DirectExchange normalExchange, Queue normalQueue) {return BindingBuilder.bind(normalQueue).to(normalExchange).with("normal.routing.key");}// 死信交换机@Beanpublic DirectExchange dlxExchange() {return new DirectExchange("dlxExchange");}// 死信队列@Beanpublic Queue dlqQueue() {return new Queue("dlqQueue");}// 绑定死信队列到死信交换机@Beanpublic Binding bindingDlqQueue(DirectExchange dlxExchange, Queue dlqQueue) {return BindingBuilder.bind(dlqQueue).to(dlxExchange).with("dlx.routing.key");}
}
5.2 发送消息到普通队列
在 service-a
中发送消息到普通队列。
package com.example.servicea.service;import com.example.common.User;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;@Service
public class NormalMessageSender {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendUserToNormalQueue(User user) {rabbitTemplate.convertAndSend("normalExchange", "normal.routing.key", user);System.out.println("Sent user to normal queue: " + user);}
}
5.3 消费普通队列的消息
在 service-b
中消费普通队列的消息,并模拟消息处理失败。
package com.example.serviceb.service;import com.example.common.User;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;import java.io.IOException;@Service
public class NormalMessageReceiver {@RabbitListener(queues = "normalQueue")public void receiveUserFromNormalQueue(User user, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {try {System.out.println("Received user from normal queue: " + user);if (user.getName().equals("Bob")) {throw new RuntimeException("Simulated processing failure");}// 手动确认消息channel.basicAck(tag, false);} catch (Exception e) {// 拒绝消息并重新入队channel.basicNack(tag, false, false); // 不重新入队,消息会被路由到死信队列System.out.println("Message rejected and sent to DLQ: " + user);}}
}
5.4 消费死信队列的消息
在 service-b
中消费死信队列的消息。
package com.example.serviceb.service;import com.example.common.User;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;@Service
public class DLQMessageReceiver {@RabbitListener(queues = "dlqQueue")public void receiveUserFromDLQ(User user) {System.out.println("Received user from DLQ: " + user);}
}
6. 测试死信队列
6.1 发送消息
在 service-a
中发送消息到普通队列:
normalMessageSender.sendUserToNormalQueue(new User("Alice", 25));
normalMessageSender.sendUserToNormalQueue(new User("Bob", 30));
6.2 观察日志
- 正常消息(
Alice
)会被消费并确认:Received user from normal queue: User{name='Alice', age=25}
- 失败消息(
Bob
)会被拒绝并路由到死信队列:Received user from normal queue: User{name='Bob', age=30} Message rejected and sent to DLQ: User{name='Bob', age=30} Received user from DLQ: User{name='Bob', age=30}
7. 总结
通过以上步骤,我们实现了 RabbitMQ 的死信队列功能:
- 普通队列:绑定到普通交换机,配置了死信交换机和路由键。
- 死信队列:绑定到死信交换机,用于存储无法被正常消费的消息。
- 消息处理:
- 正常消息被消费并确认。
- 失败消息被拒绝并路由到死信队列。
- 死信队列消费:单独消费死信队列中的消息。
这种机制非常适合处理异常情况下的消息,确保系统的可靠性和可维护性。
8. 其他常用知识点
8.1 消息过期(TTL)
可以为队列或消息设置过期时间(Time-To-Live, TTL)。过期后的消息会被路由到死信队列。
设置队列 TTL:
@Bean
public Queue normalQueue() {return QueueBuilder.durable("normalQueue").deadLetterExchange("dlxExchange").deadLetterRoutingKey("dlx.routing.key").ttl(60000) // 设置队列中消息的 TTL 为 60 秒.build();
}
设置消息 TTL:
MessageProperties properties = new MessageProperties();
properties.setExpiration("60000"); // 设置消息的 TTL 为 60 秒
Message message = new Message(body, properties);
rabbitTemplate.send("normalExchange", "normal.routing.key", message);
8.2 优先级队列
可以为队列设置优先级,优先级高的消息会被优先消费。
设置优先级队列:
@Bean
public Queue priorityQueue() {return QueueBuilder.durable("priorityQueue").maxPriority(10) // 设置最大优先级为 10.build();
}
发送优先级消息:
MessageProperties properties = new MessageProperties();
properties.setPriority(5); // 设置消息优先级为 5
Message message = new Message(body, properties);
rabbitTemplate.send("priorityExchange", "priority.routing.key", message);
希望这篇博客对你有所帮助!如果有任何问题或建议,欢迎在评论区留言。