关于保证消息的可靠性,可以从rabbitmq的组成部分来分析,第一部分发送方,第二部分服务端,第三部分消费方,第四部分兜底部分
1.生产者发送确认机制,当生产者发送消息到rabbitmq后,rabbitmq会给生产者一个确认,告诉生产者这个消息我收到且保存
java代码
channel.confirmSelect(); // 启用生产者确认
if (channel.waitForConfirms()) {System.out.println("Message successfully delivered!"); } else {System.out.println("Message delivery failed!"); }
import com.rabbitmq.client.*;public class PublisherConfirmProducer {private final static String QUEUE_NAME = "persistent_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {// 声明持久化队列channel.queueDeclare(QUEUE_NAME, true, false, false, null);// 启用发布者确认模式channel.confirmSelect();String message = "Hello RabbitMQ with Publisher Confirms!";// 发布消息channel.basicPublish("", QUEUE_NAME, new AMQP.BasicProperties.Builder().deliveryMode(2).build(), message.getBytes());// 等待确认if (channel.waitForConfirms()) {System.out.println("Message successfully delivered!");} else {System.out.println("Message delivery failed!");}}} }
2.服务端方面,消息持久化,也就是把消息保存到磁盘中,即rabbitmq重启后,队列依旧存在,未消费的消息依旧存在
java代码方面创建持久化队列,可以消息持久化
import com.rabbitmq.client.*;public class Producer {private final static String QUEUE_NAME = "persistent_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {// 声明一个持久化的队列channel.queueDeclare(QUEUE_NAME, true, false, false, null);String message = "Hello RabbitMQ!";// 发布持久化消息channel.basicPublish("", QUEUE_NAME, new AMQP.BasicProperties.Builder().deliveryMode(2).build(), // 设置消息持久化message.getBytes());System.out.println("Sent: " + message);}} }
3.从消费方设计,消息消费后进行一个手动的确认,取代原有的自动确认
java代码方面
import com.rabbitmq.client.*;public class Consumer {private final static String QUEUE_NAME = "persistent_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {// 声明持久化队列(即便消费者已经运行,这里仍然要声明队列)channel.queueDeclare(QUEUE_NAME, true, false, false, null);// 设置消费者回调,手动确认消息DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("Received: " + message);// 发送确认,表示已成功处理该消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);};// 开始消费消息,设置自动确认为 false,开启手动确认channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});}} }
4.兜底方案,即死信队列,你可能会问"消费者未能消费的,不做确认不就好了,消息依旧在rabbitmq啊,为啥要使用死信队列呢",这里使用死信队列,是为了这些失败的消息避免被重复消息,因为不可控制的原因,消费失败的问题短时间会持续存在,使用死信队列为了给正常的
消息腾空间,同时使用死信队列,也是为了更好的分析失败原因,总体来说就是好坑位要留给需要的人
java代码使用死信队列
import com.rabbitmq.client.*;public class DeadLetterProducer {private final static String QUEUE_NAME = "main_queue";private final static String DLQ_NAME = "dead_letter_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {// 创建死信队列channel.queueDeclare(DLQ_NAME, true, false, false, null);// 创建主队列并设置死信交换机和路由键channel.queueDeclare(QUEUE_NAME, true, false, false, Map.of("x-dead-letter-exchange", "", "x-dead-letter-routing-key", DLQ_NAME));String message = "Message for Dead Letter Queue";// 发布消息channel.basicPublish("", QUEUE_NAME, new AMQP.BasicProperties.Builder().deliveryMode(2).build(), message.getBytes());System.out.println("Sent to main queue: " + message);}} }
以上从四个方面介绍保证rabbitmq消息可靠性措施