- 基于注解方式
在consumer服务基于@RabbitListener注解来声明队列、交换机和绑定队列和交换机,并且设置交换机为延迟交换机:
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "delay.queue", durable = "true"),exchange = @Exchange(value = "delay.direct", delayed = "true"),key = "delay"))
public void listenDelayQueue(String msg) {log.info("delay.queue:" + msg);
}
delayed = "true" |
设置交换机为延迟交换机 |
- 基于@Bean方式
在consumer服务基于@Bean注解来声明交换机、队列和绑定队列和交换机,并且设置交换机为延迟交换机:
@Configuration
public class DirectConfiguration {@Beanpublic DirectExchange delayExchange() {return ExchangeBuilder.directExchange("delay.direct").delayed().durable(true).build();}@Beanpublic Queue delayedQueue() {return new Queue("delay.queue");}@Beanpublic Binding delayQueueBinding() {return BindingBuilder.bind(delayedQueue()).to(delayExchange()).with("delay");}
}
- 延迟消息发送
在publisher服务中的测试类添加一个测试方法,通过消息头x-delay来设置过期时间,实现延迟消息发送:
@Testvoid testSendDelayMessage() {rabbitTemplate.convertAndSend("delay.direct", "delay", "hello", new MessagePostProcessor() {@Overridepublic Message postProcessMessage(Message message) throws AmqpException {message.getMessageProperties().setDelay(10000);return message;}});log.info("消息发送成功");}
.setDelay() |
通过消息头x-delay来设置消息的延迟时间 |