一、发送 & 接收延迟消息
1.1、概述
延迟消息是指发送者发送完消息后,不希望消息被立即投送给订阅者,等一段时间之后再投递给订阅者,例如生活中我们常见的例子,京东商城购物、12306买火车票...,下完订单后就可以发送一个延迟消息,30分钟后检查该订单的状态,如果用户未付款就取消此订单,释放库存。使用 RocketMQ 的延时消息可以简化定时调度任务的开发逻辑,实现高性能、可扩展、高可靠的定时触发能力。
1.2、消息模型
1.3、Demo04MQTestApp
/*** @Author : 一叶浮萍归大海* @Date: 2023/12/25 10:26* @Description: 发送 & 接收延迟消息*/
@Slf4j
public class Demo04MQTestApp {/*** 发送延迟消息*/@Testpublic void demo4Producer() throws Exception {// 1、创建一个生产者DefaultMQProducer producer = new DefaultMQProducer("delay-producer-group");// 2、连接NameServerproducer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);// 3、启动producer.start();// 4、创建消息Message message = new Message("delay-topic", "这是一个延迟消息)".getBytes(StandardCharsets.UTF_8));/*** 4.1、在message中设置延迟时间* 注意事项:4.x版本的RocketMQ只支持如下等级的延迟时间,例如:1代表延迟1s,10代表延迟6min,18代表延迟2小时* 1==1s、2==5s、3==10s、4==30s、5==1m、6==2m、7==3m、8==4m、9==5m* 10 ==》6m、11 ==》7m、12 ==》8m、13 ==》9m、14 ==》10m、15 ==》20m、16 ==》30m、17 ==》1h、18 ==》2h*/message.setDelayTimeLevel(4);// 5、发送消息producer.sendOneway(message);log.info("【demo4Producer】发送消息成功,当前时间:{}",LocalDateTimeUtil.format(LocalDateTime.now(),"yyyy-MM-dd HH:mm:ss"));// 6、关闭生产者producer.shutdown();}/*** 接收延迟消息(Push方式)*/@Testpublic void demo4PushConsumer() throws Exception {// 1、创建一个消费者DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delay-consumer-group");// 2、连接NameServerconsumer.setNamesrvAddr(RocketMQConstant.NAME_SERVER_ADDR);// 3、订阅消息,*表示订阅该主题所有的消息consumer.subscribe("delay-topic", "*");// 4、设置监听器(采用异步回调方式,一直监听)consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {// 业务逻辑log.info("我是消费者【demo4PushConsumer】,当前时间:{}",LocalDateTimeUtil.format(LocalDateTime.now(),"yyyy-MM-dd HH:mm:ss"));for (MessageExt msg : msgs) {log.info("我是消费者【demo4PushConsumer】,我收到的消息是:{},当前时间:{}", StrUtil.utf8Str(msg.getBody()),LocalDateTimeUtil.format(LocalDateTime.now(),"yyyy-MM-dd HH:mm:ss"));}/*** 返回值:消费消息成功与否* CONSUME_SUCCESS:表明消费成功,消息会从MQ出队* RECONSUME_LATER:表明消费失败,消息会重新回到队里,过一会儿再重新投递出来给当前消费者或者其他消费者*/return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});// 5、启动consumer.start();log.info("【demo4PushConsumer】启动成功,正在等待接收消息...");// 6、挂起当前JVMSystem.in.read();}}
1.4、RocketMQ中的延迟等级
https://rocketmq.apache.org/zh/docs/4.x/producer/04message3
1.5、测试
先后运行demo4PushConsumer和demo4Producer,观察控制台日志: