文章目录
- 一、RabbitMQ消息可靠性概述
- 1、引出问题
- 2、RabbitMQ消息可靠性保证的四个环节
- 二、保证生产者消息发送到RabbitMQ服务器
- 1、服务端确认:Transaction模式
- (1)JavaAPI
- (2)springbootAPI
- 2、服务端确认:Confirm模式
- (1)JavaAPI
- (2)springbootAPI
- 三、保证消息从交换机路由到队列
- 1、消息回发
- (1)JavaAPI
- (2)SpringbootAPI
- 2、消息路由到备份交换机
- (1)JavaAPI
- 四、保证消息在队列中可靠存储
- 1、队列持久化
- 2、交换机持久化
- 3、消息持久化
- 4、搭建集群
- 五、保证消费者成功消费消息
- 1、自动ACK
- 2、手动ACK
- 六、保证消息一致之消费者回调
- 1、调用生产者API
- 2、发送响应消息给生产者
- 七、保证消息一致性之补偿机制
- 1、消息结果查证
- 2、消息重发
- 3、消息幂等性
- 4、最终一致性
- 八、消息的顺序性
一、RabbitMQ消息可靠性概述
1、引出问题
我们先看一串代码,并思考一下为什么要先入库然后发MQ:
public int add(Merchant merchant) {int k = merchantMapper.add(merchant);System.out.println("aaa : "+merchant.getId());JSONObject title = new JSONObject();String jsonBody = JSONObject.toJSONString(merchant);title.put("type","add");title.put("desc","新增商户");title.put("content",jsonBody);rabbitTemplate.convertAndSend(topicExchange,topicRoutingKey, title.toJSONString());return k;
}
如果先发MQ的话,如果入库失败,就会导致MQ消息无法回滚了。今天我们就好好聊一聊RabbitMQ消息可靠投递的问题。
2、RabbitMQ消息可靠性保证的四个环节
① 消息从生产者发送到Broker
生产者把消息发送到Broker之后,如何知道自己的消息有没有被Broker成功接收?如果Broker不给应答,生产者发送的消息也无法知道成功还是失败。
② 消息从Exchange路由到Queue
Exchange交换机维护一个与Queue的绑定列表,它的职责是分发消息。如果交换机处理消息的分发出现问题怎么办?
③ 消息存储在Queue
RabbitMQ的队列有自己的数据库(Mnesia),它是真正用来存储消息的。如果还没有消费者来消费,消息要一直存储在队列中。队列中的消息有没有丢失的可能?怎么保证消息在队列中稳定的存储呢?
④ 消费者订阅Queue并消费消息
队列是先进先出的,消息投递给消费者是一条一条投递的,如何能保证消费者正确地消费了消息?
二、保证生产者消息发送到RabbitMQ服务器
生产者发送RabbitMQ消息时,如果遇到了网络问题或者Broker的问题(硬盘故障、磁盘写满等),就会导致消息发送失败,生产者无法确定Broker是否正确地接收到消息。
在RabbitMQ中提供了两种生产者确认机制,也就是说生产者发送消息给RabbitMQ时,服务端会通过某种方式返回一个应答,只要生产者收到了这个应答,就知道消息发送成功了。
1、服务端确认:Transaction模式
在事务模式里面,只有收到了服务端的Commit-OK指令,才能提交成功。所以可以解决生产者和服务端确认的问题。但是事务模式有一个缺点,它是阻塞的,一条消息没有发送完毕,不能发送下一条消息,它会榨干RabbitMQ服务器的性能。所以不建议在生产环境使用。
(1)JavaAPI
import com.gupaoedu.util.ResourceUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;/*** 消息生产者,测试事务模式。发送消息的效率比较低,建议使用Confirm模式* 参考文章:https://www.cnblogs.com/vipstone/p/9350075.html*/
public class TransactionProducer {private final static String QUEUE_NAME = "ORIGIN_QUEUE";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri(ResourceUtil.getKey("rabbitmq.uri"));// 建立连接Connection conn = factory.newConnection();// 创建消息通道Channel channel = conn.createChannel();String msg = "Hello world, Rabbit MQ";// 声明队列(默认交换机AMQP default,Direct)// String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> argumentschannel.queueDeclare(QUEUE_NAME, false, false, false, null);try {// 开启事务channel.txSelect();// 发送消息,发布了4条,但只确认了3条// String exchange, String routingKey, BasicProperties props, byte[] bodychannel.basicPublish("", QUEUE_NAME, null, (msg).getBytes());channel.txCommit(); // 提交channel.basicPublish("", QUEUE_NAME, null, (msg).getBytes());channel.txCommit();channel.basicPublish("", QUEUE_NAME, null, (msg).getBytes());channel.txCommit();channel.basicPublish("", QUEUE_NAME, null, (msg).getBytes());int i =1/0;channel.txCommit();channel.basicPublish("", QUEUE_NAME, null, (msg).getBytes());channel.txCommit();System.out.println("消息发送成功");} catch (Exception e) {channel.txRollback(); // 回滚System.out.println("消息已经回滚");}channel.close();conn.close();}
}
(2)springbootAPI
rabbitTemplate.setChannelTransacted(true);
2、服务端确认:Confirm模式
Confirm模式有三种,单个确认、批量确认、异步确认。
(1)JavaAPI
import com.gupaoedu.util.ResourceUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;/*** 普通确认模式*/
public class NormalConfirmProducer {private final static String QUEUE_NAME = "ORIGIN_QUEUE";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri(ResourceUtil.getKey("rabbitmq.uri"));// 建立连接Connection conn = factory.newConnection();// 创建消息通道Channel channel = conn.createChannel();String msg = "Hello world, Rabbit MQ ,Normal Confirm";// 声明队列(默认交换机AMQP default,Direct)// String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> argumentschannel.queueDeclare(QUEUE_NAME, false, false, false, null);// 开启发送方确认模式channel.confirmSelect();channel.basicPublish("", QUEUE_NAME, null, msg.getBytes());// 普通Confirm,发送一条,确认一条if (channel.waitForConfirms()) {System.out.println("消息发送成功" );}else{System.out.println("消息发送失败");}channel.close();conn.close();}
}
import com.gupaoedu.util.ResourceUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;/*** 消息生产者,测试批量Confirm模式*/
public class BatchConfirmProducer {private final static String QUEUE_NAME = "ORIGIN_QUEUE";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri(ResourceUtil.getKey("rabbitmq.uri"));// 建立连接Connection conn = factory.newConnection();// 创建消息通道Channel channel = conn.createChannel();String msg = "Hello world, Rabbit MQ ,Batch Confirm";// 声明队列(默认交换机AMQP default,Direct)// String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> argumentschannel.queueDeclare(QUEUE_NAME, false, false, false, null);try {// 开启confirm模式channel.confirmSelect();for (int i = 0; i < 5; i++) {// 发送消息// String exchange, String routingKey, BasicProperties props, byte[] bodychannel.basicPublish("", QUEUE_NAME, null, (msg +"-"+ i).getBytes());}// 批量确认结果,ACK如果是Multiple=True,代表ACK里面的Delivery-Tag之前的消息都被确认了// 比如5条消息可能只收到1个ACK,也可能收到2个(抓包才看得到)// 直到所有信息都发布,只要有一个未被Broker确认就会IOExceptionchannel.waitForConfirmsOrDie();System.out.println("消息发送完毕,批量确认成功");} catch (Exception e) {// 发生异常,可能需要对所有消息进行重发e.printStackTrace();}channel.close();conn.close();}
}
import com.gupaoedu.util.ResourceUtil;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.ConfirmListener;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;import java.io.IOException;
import java.util.Collections;
import java.util.SortedSet;
import java.util.TreeSet;/*** 消息生产者,测试异步Confirm模式* 参考文章:https://www.cnblogs.com/vipstone/p/9350075.html*/
public class AsyncConfirmProducer {private final static String QUEUE_NAME = "ORIGIN_QUEUE";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setUri(ResourceUtil.getKey("rabbitmq.uri"));// 建立连接Connection conn = factory.newConnection();// 创建消息通道Channel channel = conn.createChannel();String msg = "Hello world, Rabbit MQ, Async Confirm";// 声明队列(默认交换机AMQP default,Direct)// String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> argumentschannel.queueDeclare(QUEUE_NAME, false, false, false, null);// 用来维护未确认消息的deliveryTagfinal SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());// 这里不会打印所有响应的ACK;ACK可能有多个,有可能一次确认多条,也有可能一次确认一条// 异步监听确认和未确认的消息// 如果要重复运行,先停掉之前的生产者,清空队列channel.addConfirmListener(new ConfirmListener() {public void handleNack(long deliveryTag, boolean multiple) throws IOException {System.out.println("Broker未确认消息,标识:" + deliveryTag);if (multiple) {// headSet表示后面参数之前的所有元素,全部删除confirmSet.headSet(deliveryTag + 1L).clear();} else {confirmSet.remove(deliveryTag);}// 这里添加重发的方法}public void handleAck(long deliveryTag, boolean multiple) throws IOException {// 如果true表示批量执行了deliveryTag这个值以前(小于deliveryTag的)的所有消息,如果为false的话表示单条确认System.out.println(String.format("Broker已确认消息,标识:%d,多个消息:%b", deliveryTag, multiple));if (multiple) {// headSet表示后面参数之前的所有元素,全部删除confirmSet.headSet(deliveryTag + 1L).clear();} else {// 只移除一个元素confirmSet.remove(deliveryTag);}System.out.println("未确认的消息:"+confirmSet);}});// 开启发送方确认模式channel.confirmSelect();for (int i = 0; i < 10; i++) {long nextSeqNo = channel.getNextPublishSeqNo();// 发送消息// String exchange, String routingKey, BasicProperties props, byte[] bodychannel.basicPublish("", QUEUE_NAME, null, (msg +"-"+ i).getBytes());confirmSet.add(nextSeqNo);}System.out.println("所有消息:"+confirmSet);// 这里注释掉的原因是如果先关闭了,可能收不到后面的ACK//channel.close();//conn.close();}
}
(2)springbootAPI
Confirm模式是在Channel上开启的,RabbitTemplate对Channel进行了封装。
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {public void confirm(CorrelationData correlationData, boolean ack, String cause) {System.out.println("--------收到服务端异步确认--------");System.out.println("received ack: "+ack);System.out.println("cause: "+cause);System.out.println("correlationId: "+correlationData.getId());if (!ack) {System.out.println("发送消息失败:" + cause);throw new RuntimeException("发送异常:" + cause);// 做一些回滚、重试、记录处理} else {System.out.println("消息确认成功");}}
});
三、保证消息从交换机路由到队列
如果routingkey错误,或者队列不存在(但是生产环境基本不会出现这么低级的错误),就会导致消息从交换机无法路由到队列。
我们有两种方式处理无法路由的消息,一种是服务端回发给生产者,一种是让交换机路由到另一个备份的交换机。
1、消息回发
(1)JavaAPI
channel.addReturnListener(new ReturnListener() {public void handleReturn(int replyCode,String replyText,String exchange,String routingKey,AMQP.BasicProperties properties,byte[] body)throws IOException {System.out.println("=========监听器收到了无法路由,被返回的消息============");System.out.println("replyText:"+replyText);System.out.println("exchange:"+exchange);System.out.println("routingKey:"+routingKey);System.out.println("message:"+new String(body));}
});
(2)SpringbootAPI
Springboot消息回发是使用mandatory参数和ReturnListener(在SPringAMQP中是ReturnCallback)
// 为RabbitTemplate设置ReturnCallback
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {@Overridepublic void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {try {System.out.println("--------收到无法路由回发的消息--------");System.out.println("replyCode:" + replyCode);System.out.println("replyText:" + replyText);System.out.println("exchange:" + exchange);System.out.println("routingKey:" + routingKey);System.out.println("properties:" + message.getMessageProperties());System.out.println("body:" + new String(message.getBody(), "UTF-8"));} catch (UnsupportedEncodingException e) {e.printStackTrace();}}
});
2、消息路由到备份交换机
在创建交换机时,从属性中指定备份交换机。
(1)JavaAPI
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().deliveryMode(2).contentEncoding("UTF-8").build();// 备份交换机
channel.exchangeDeclare("ALTERNATE_EXCHANGE","topic", false, false, false, null);
channel.queueDeclare("ALTERNATE_QUEUE", false, false, false, null);
channel.queueBind("ALTERNATE_QUEUE","ALTERNATE_EXCHANGE","#");// 在声明交换机的时候指定备份交换机
Map<String,Object> arguments = new HashMap<String,Object>();
arguments.put("alternate-exchange","ALTERNATE_EXCHANGE");
channel.exchangeDeclare("TEST_EXCHANGE","topic", false, false, false, arguments);// 发送到了默认的交换机上,由于没有任何队列使用这个关键字跟交换机绑定,所以会被退回
// 第三个参数是设置的mandatory,如果mandatory是false,消息也会被直接丢弃
channel.basicPublish("TEST_EXCHANGE","error.routingKey",true, properties,"只为更好的你".getBytes());
四、保证消息在队列中可靠存储
如果消息一直没有被消费者消费,消息会一直存储在队列中,并且队列中的数据存储在数据库中。
如果RabbitMQ服务或者硬件发生了故障,比如系统宕机、重启、关闭等等,可能会导致内存中的消息丢失,所以我们要把消息本身和元数据(队列、交换机、绑定)都保存到磁盘中。
1、队列持久化
声明队列时可以设置几个重要的参数:
// 声明队列
// String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
channel.queueDeclare(QUEUE_NAME, false, false, false, null);// springboot中
@Bean("reliableQueue")
public Queue queue() {// public Queue(String name, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)return new Queue("RELIABLE_QUEUE", true, false, false, new HashMap<>());
}
参数解析:
durable:没有持久化的队列,保存在内存中,服务重启后队列和消息都会丢失。设置为true可以保证消息持久化
exclusive:排他性队列的特点是:只对首次声明它的连接(Connection)可见;会在其连接断开的时候自动删除。
autoDelete:没有消费者连接的时候,自动删除。
2、交换机持久化
声明交换机时可以设置这几个重要的参数,和队列类似:
// 声明交换机
// String exchange, String type, boolean durable, boolean autoDelete, Map<String, Object> arguments
channel.exchangeDeclare(EXCHANGE_NAME,"direct",false, false, null);// Springboot中
@Bean("directExchange")
public DirectExchange exchange() {// public DirectExchange(String name, boolean durable, boolean autoDelete, Map<String, Object> arguments)return new DirectExchange("RELIABLE_EXCHANGE", true, false, new HashMap<>());
}
3、消息持久化
在生产者发送消息时,可以指定消息的配置:
Map<String, Object> headers = new HashMap<String, Object>();
headers.put("name", "gupao");
headers.put("level", "top");
AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().deliveryMode(2) // 2代表持久化.contentEncoding("UTF-8") // 编码.expiration("10000") // TTL,过期时间.headers(headers) // 自定义属性.priority(5) // 优先级,默认为5,配合队列的 x-max-priority 属性使用.messageId(String.valueOf(UUID.randomUUID())).build();
channel.basicPublish("", QUEUE_NAME, properties, msg.getBytes());// springboot中
MessageProperties messageProperties = new MessageProperties();
messageProperties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
messageProperties.setContentType("UTF-8");
Message message = new Message("一条正常的消息".getBytes(), messageProperties);
4、搭建集群
如果只有一个RabbitMQ节点,即使交换机、队列、消息做了持久化,如果服务崩溃或者硬件故障,RabbitMQ的服务一样是不可以的。
所以为了提高MQ服务的可用性,保证消息的传输,我们需要有多个RabbitMQ的节点。
RabbitMQ集群搭建与高可用实现
五、保证消费者成功消费消息
如果消费者收到消息后没来得及处理或者发生了一场,就会导致消费失败。RabbitMQ提供了消费者的消息确认机制(message acknowledgement),消费者可以自动或者手动地发送ACK给服务端。
如果消费者拿到消息没有ACK会怎么样?
没有收到ACK的消息,消费者断开连接之后,RabbitMQ会把这条消息发送给其他消费者。如果没有其他消费者,消费者重启后会重新消费这条消息,重复执行业务逻辑。
1、自动ACK
默认就是自动ACK。消费者收到消息的时候
就会自动发送ACK,而不是方法执行成功的时候发送ACK。这种情况RabbitMQ只会关心消费者是否接收到了消息,对消息的处理结果是不关心的,所以通常来说我们会选择手动ACK。
// 创建消费者
Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {String msg = new String(body, "UTF-8");System.out.println("Received message : '" + msg + "'");System.out.println("consumerTag : " + consumerTag );System.out.println("deliveryTag : " + envelope.getDeliveryTag() );}
};
// 开始获取消息 autoAck参数为true代表自动ACK
// String queue, boolean autoAck, Consumer callback
channel.basicConsume(QUEUE_NAME, true, consumer);
同样的,在Springboot中如果没有特殊配置,默认的就是自动ACK。
2、手动ACK
// 创建消费者,并接收消息
Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,byte[] body) throws IOException {String msg = new String(body, "UTF-8");System.out.println("Received message : '" + msg + "'");if (msg.contains("拒收")){// 拒绝消息// requeue:是否重新入队列,true:是,会再发给其他消费者;false:直接丢弃,相当于告诉队列可以直接删除掉// TODO 如果只有这一个消费者,requeue 为true 的时候会造成消息重复消费// basicReject(long deliveryTag, boolean requeue)channel.basicReject(envelope.getDeliveryTag(), false);} else if (msg.contains("异常")){// 批量拒绝// requeue:是否重新入队列// TODO 如果只有这一个消费者,requeue 为true 的时候会造成消息重复消费// basicNack(long deliveryTag, boolean multiple, boolean requeue)channel.basicNack(envelope.getDeliveryTag(), true, false);} else {// 手工应答// 如果不应答,队列中的消息会一直存在,重新连接的时候会重复消费// basicAck(long deliveryTag, boolean multiple)channel.basicAck(envelope.getDeliveryTag(), true);}}
};// 开始获取消息,注意这里开启了手工应答
// String queue, boolean autoAck, Consumer callback
channel.basicConsume(QUEUE_NAME, false, consumer);
在Springboot中,可以这样配置应答方式:
spring.rabbitmq.listener.direct.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=manual
或者在SimpleMessageListenerContainer或者SimpleRabbitListenerContainerFactory中这样设置:
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
消费者中获取到Channel对象,就可以进行应答了:
@Component
@PropertySource("classpath:mq.properties")
@RabbitListener(queues = "${com.queue}", containerFactory="rabbitListenerContainerFactory")
public class SecondConsumer {@RabbitHandlerpublic void process(String msgContent,Channel channel, Message message) throws IOException {System.out.println("Second Queue received msg : " + msgContent );channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}
}
注意这个枚举值:
NONE:自动ACK
MANUAL:手动ACK
AUTO:如果方法未抛出异常,则发送ACK。如果方法抛出异常,并且不是AmqpRejectAndDontRequeueException则发送nack,并且重新入队。如果抛出异常是AmqpRejectAndDontRequeueException则发送nack并不会重新入队。
注意!拒绝消息时如果requeue参数设置为true,可以把这条消息重新存入队列,以便发给下一个消费者处理,如果只有一个消费者时,这种方式可能会出现无限循环重复消费的情况。
六、保证消息一致之消费者回调
消费者成功消费了消息之后,如何告知生产者该消息已经被成功消费了?
虽然说使用MQ的目的之一是解耦,但是某些一致性要求很高的场景,比如说金融业务,还是很有必要通知生产者的。
1、调用生产者API
例如,提单系统给其他系统发送了保险消息后,其他系统必须在处理完消息之后调用提单系统提供的API,来修改提单系统中这笔数据的状态。只要API没有被调用,数据状态没有被修改,提单系统就认为下游系统没有收到这条消息。
但是这种方式又从解耦的状态变成了耦合状态,还是需要根据实际情况来判断是否要采用这种方式。
2、发送响应消息给生产者
例如商业银行与人民银行二代支付系统(使用IBM MQ),无论是人行收到了商业银行的消息,还是商业银行收到了人行的消息,都必须发送一条响应消息(叫做回执报文)。
整个通信的流程设计非常复杂,但是对于金融场景下的消息可靠性保证,是很有用的。
七、保证消息一致性之补偿机制
如果消费者消费了消息之后,一直迟迟没有通知给生产者,我们如何知道消费者已经成功消费了消息?
就像是银行A给银行B发起了一笔转账,银行B一直没有给银行A回调,此时我们如何确定银行B确定是已经处理了该消息?
此时生产者与消费者之间应该约定一个超时时间,对于超出这个时间没有得到响应的消息,才确定为消费失败,比如说5分钟。
1、消息结果查证
假如说生产者5分钟内没有收到消费者消费成功的回调,生产者可以主动发起一次结果查证,通过业务要素或者唯一流水号,查证该业务在消费者是否正常消费。
2、消息重发
假如说消息一直没有结果,就需要考虑消息重发了。
可以启动一个定时任务,比如30秒跑一次,查询业务表业务状态是中间状态的记录,查询出来,构建MQ消息,重新发送。也可以单独设计一张消息表,把本系统所有发送出去的消息全部异步登记,状态是未回复的消息,进行重发(这种方式会对数据库性能造成一定损耗)。
重发机制也不可能一直重复发,如果消费者确实是有bug或者其他问题,如果一直重复发送会导致死循环了。我们可以设置一个衰减机制,第一次间隔一分钟,第二次间隔2分钟,最终发送三次,三次过后如果还没有收到回复,就需要将消息设置为特殊状态,进行人工干预。
3、消息幂等性
消息重发就意味着要处理消息的幂等。
什么是服务的幂等?为什么要实现幂等?
接口的幂等性——详细谈谈接口的幂等即解决方案
4、最终一致性
在一些一致性很强的接口调用中,比如说转账操作,通常会在一天的业务结束之后,第二天营业之前,生产者和消费者之间会进行一次消息对账。生成一个对账文件,两者分别解析对方的文件进行对账,如果确实有消息不一致的情况,会通过短信或者邮件的方式通知业务人员进行手动处理,要么把钱退回,要么把钱补上。
八、消息的顺序性
在RabbitMQ中,一个队列有多个消费者时,由于不同的消费者消费消息的速度是不一样的,顺序无法保证。只有一个队列仅有一个消费者的情况才能保证顺序消费(不同的业务消息发送到不同的专用队列)。
除非负载的场景,不要用多个消费者消费消息,可以保证消息的顺序性。