1.死信队列+TTL
-
什么是TTL
- time to live:消息存活时间
- 如果消息在存活时间内未被消费,则会被清除
- RabbitMQ支持两种TTL设置
- 单独消息进行配置TTL
- 整个队列进行配置TTL(使用居多)
-
什么是RabbitMQ的死信队列
- 没有被及时消费的消息存放的队列
-
什么是RabbitMQ的死信交换机
- Dead Letter Exchange(死信交换机,缩写DLX),当消息成为死信后,会被重新发送到另一个交换机,这个交换机就是DLX死信交换机
-
消息有哪几种情况成为死信
-
消费者拒收消息(basic.reject/basic.nack),并且没有重新入队requeue=false
-
消息在队列中未被消费,且超过队列或者消息本身的过期时间TTL(time to live)
-
队列的消息长度达到极限
-
结果:消息成为死信后,如果该队列绑定了死信交换机,则消息会被死信交换机重新路由到死信队列
-
2.使用管控台测试死信队列
-
RabbitMQ管控台消息TTL测试
-
队列过期时间使用参数,对整个队列消息统一过期
- x-message-ttl:单位ms毫秒
-
消息过期时间使用参数(如果队列头部消息未过期,队列中间消息已经过期,该消息还在队列里面)
- expiration:单位ms毫秒
-
两者都配置的话,时间短的先触发
-
-
RabbitMQ Web控制台测试
-
新建死信交换机(和普通交换机没区别)
-
新建死信队列(和普通队列没区别)
-
死信交换机和死信队列绑定
-
新建普通队列,设置过期时间、指定死信交换机
-
测试:直接在Web控制台往普通队列发送消息即可
-
3.延迟队列
-
什么是延迟队列
- 一种带有延迟功能的消息队列,Producer将消息发送到消息队列服务端,但并不期望这条消息立马投递,而是推迟在当前时间点之后的某一个时间投递到Consumer进行消费,该消息即定时消息
-
业界的一些实现延迟方式
- 定时任务高精度轮训
- 采用RocketMQ自带延迟消息功能
- RabbitMQ本身是不支持延迟队列的,怎么办?
- 结合死信队列的特性,就可以做到延迟消息
-
交换机和队列注册代码
package com.gen.config;import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;import java.util.HashMap; import java.util.Map;@Configuration public class RabbitMQConfig {/*** 死信交换机*/public static final String DEAD_EXCHANGE = "dead_exchange";/*** 死信队列*/public static final String DEAD_QUEUE = "dead_queue";/*** 死信路由键*/public static final String DEAD_ROUTING_KEY = "dead_routing_key";/*** 死信交换机** @return*/@Beanpublic Exchange deadExchange() {return ExchangeBuilder.topicExchange(DEAD_EXCHANGE).durable(true).build();}/*** 死信队列** @return*/@Beanpublic Queue deadQueue() {return QueueBuilder.durable(DEAD_QUEUE).build();}/*** 死信交换机与死信队列进行绑定** @param deadQueue* @param deadExchange* @return*/@Beanpublic Binding deadBinding(Queue deadQueue, Exchange deadExchange) {return BindingBuilder.bind(deadQueue).to(deadExchange).with(DEAD_ROUTING_KEY).noargs();}/*** 普通交换机*/public static final String ORDER_EXCHANGE = "order_exchange";/*** 普通队列*/public static final String ORDER_QUEUE = "order_queue";/*** 普通路由键*/public static final String ORDER_ROUTING_KEY = "order_routing_key";/*** 普通交换机** @return*/@Beanpublic Exchange orderExchange() {return ExchangeBuilder.topicExchange(ORDER_EXCHANGE).durable(true).build();}/*** 普通队列** @return*/@Beanpublic Queue orderQueue() {Map<String, Object> args = new HashMap<>(3);// 过期时间,单位毫秒args.put("x-message-ttl", 10000);// 消息过期后,进入到死信交换机args.put("x-dead-letter-exchange", DEAD_EXCHANGE);// 消息过期后,进入到死信交换机的路由键args.put("x-dead-letter-routing-key", DEAD_ROUTING_KEY);return QueueBuilder.durable(ORDER_QUEUE).withArguments(args).build();}/*** 普通交换机与普通队列进行绑定** @param orderQueue* @param orderExchange* @return*/@Beanpublic Binding orderBinding(Queue orderQueue, Exchange orderExchange) {return BindingBuilder.bind(orderQueue).to(orderExchange).with(ORDER_ROUTING_KEY).noargs();}}
-
消息生产者
package com.gen;import com.gen.config.RabbitMQConfig; import org.junit.jupiter.api.Test; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest;@SpringBootTest class GenRabbitmqApplicationTests {@Autowiredprivate RabbitTemplate rabbitTemplate;@Testvoid send() {this.rabbitTemplate.convertAndSend(RabbitMQConfig.ORDER_EXCHANGE, RabbitMQConfig.ORDER_ROUTING_KEY, "测试延迟队列,设置10s");}}
-
消息消费者(只监听消费死信队列,不监听消费普通队列)
package com.gen.listener;import com.gen.config.RabbitMQConfig; import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;import java.io.IOException;@Component @RabbitListener(queues = RabbitMQConfig.DEAD_QUEUE) public class DeadMQListener {@RabbitHandlerpublic void deadConsumer(String msg, Message message, Channel channel) throws IOException {System.out.println(msg);// 成功确认,消费成功channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);} }