1.死信交换机
一致不被消费的信息/过期的信息/被标记nack/reject的信息,这些消息都可以进入死信交换机,但是首先要配置的有私信交换机。私信交换机可以再RabbitMQ的客户端上选定配置-dead-letter-exchange。
2.延迟消息
像我们买车票,外卖订单,如果下了单没付款,都会显示一个30分钟后未支付将进行取消订单,这种都算是延迟消息,设置了过期时间,一直不消费,到期后就会进入死信队列。
MQ提供了延迟消息的组件:
Scheduling Messages with RabbitMQ | RabbitMQGitHub - rabbitmq/rabbitmq-delayed-message-exchange: Delayed Messaging for RabbitMQ
安装组件后,在使用时,基于注解要声明延迟交换机
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "delay.queue", durable = "true"),exchange = @Exchange(name = "delay.direct", delayed = "true"),key = "delay"
))
public void listenDelayMessage(String msg){log.info("接收到delay.queue的延迟消息:{}", msg);
}
发送延迟消息:设置延长时间
@Test
void testPublisherDelayMessage() {// 1.创建消息String message = "hello, delayed message";// 2.发送消息,利用消息后置处理器添加消息头rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {// 添加延迟消息属性message.getMessageProperties().setDelay(5000);return message;}});
}
3.对于订单的延迟消息,应该主动去查询是否已经支付成功,可以定时去发送消息,查询状态,确认是否已经状态改变。
消息监听侧,如果初始没有支付,可以根据上图不同的节点去再次发起消息查询:
@Slf4j
@Component
@RequiredArgsConstructor
public class OrderStatusListener {private final IOrderService orderService;private final PayClient payClient;private final RabbitTemplate rabbitTemplate;@RabbitListener(bindings = @QueueBinding(value = @Queue(name = MqConstants.DELAY_ORDER_QUEUE, durable = "true"),exchange = @Exchange(name = MqConstants.DELAY_EXCHANGE, type = ExchangeTypes.TOPIC),key = MqConstants.DELAY_ORDER_ROUTING_KEY))public void listenOrderCheckDelayMessage(MultiDelayMessage<Long> msg) {// 1.获取消息中的订单idLong orderId = msg.getData();// 2.查询订单,判断状态:1是未支付,大于1则是已支付或已关闭Order order = orderService.getById(orderId);if (order == null || order.getStatus() > 1) {// 订单不存在或交易已经结束,放弃处理return;}// 3.可能是未支付,查询支付服务PayOrderDTO payOrder = payClient.queryPayOrderByBizOrderNo(orderId);if (payOrder != null && payOrder.getStatus() == 3) {// 支付成功,更新订单状态orderService.markOrderPaySuccess(orderId);return;}// 4.确定未支付,判断是否还有剩余延迟时间if (msg.hasNextDelay()) {// 4.1.有延迟时间,需要重发延迟消息,先获取延迟时间的int值int delayVal = msg.removeNextDelay().intValue();// 4.2.发送延迟消息rabbitTemplate.convertAndSend(MqConstants.DELAY_EXCHANGE, MqConstants.DELAY_ORDER_ROUTING_KEY, msg,message -> {message.getMessageProperties().setDelay(delayVal);return message;});return;}// 5.没有剩余延迟时间了,说明订单超时未支付,需要取消订单orderService.cancelOrder(orderId);}
}