2.12日学习打卡----初学RocketMQ(三)

2.12日学习打卡

目录:

  • 2.12日学习打卡
  • 一. RocketMQ高级特性(续)
    • 消息重试
    • 延迟消息
    • 消息查询
  • 二.RocketMQ应用实战
    • 生产端发送同步消息
    • 发送异步消息
    • 单向发送消息
    • 顺序发送消息
    • 消费顺序消息
    • 全局顺序消息
    • 延迟消息
    • 事务消息
    • 消息查询

一. RocketMQ高级特性(续)

消息重试

生产端重试

例如由于网络原因导致生产者发送消息到MQ失败,即发送端没有收到Broker的ACK,导致最终Consumer无法消费消息,此时RocketMQ会自动进行重试。

// 同步发送消息,如果5秒内没有发送成功,则重试3次
DefaultMQProducer producer = new DefaultMQProducer("DefaultProducer");
producer.setRetryTimesWhenSendFailed(3);
producer.send(msg, 5000L);

消费端重试

同样的,由于网络原因,Broker发送消息给消费者后,没有受到消费端的ACK响应,所以Broker又会尝试将消息重新发送给Consumer,在实际开发过程中,我们更应该考虑的是消费端的重试。消费端的消息重试可以分为顺序消息的重试以及无序消息的重试。

  • 顺序消息重试
    对于顺序消息,当消费者消费消息失败后,消息队列 RocketMQ
    会自动不断进行消息重试(每次间隔时间为 1 秒),这时,应用
    会出现消息消费被阻塞的情况。因此,在使用顺序消息时,务必
    保证应用能够及时监控并处理消费失败的情况,避免阻塞现象的
    发生

  • 无序消息重试
    对于无序消息(普通、定时、延时、事务消息),当消费者消费
    消息失败时,可以通过设置返回状态达到消息重试的结果。

    • 最大重试次数
      消息消费失败后,可被消息队列RocketMQ重复投递的最大
      次数。
      TCP协议无序消息重试时间间隔:
      在这里插入图片描述
    • 消费失败后重新配置方式
    • 需要在消息监听器接口的实现中明确进行配置(三种方式任选一种):
      • 返回 ConsumeConcurrentlyStatus.RECONSUME_LATER; (推荐)
      • 返回 Null
      • 抛出异常

    延迟消息

    Producer将消息发送到消息队列RocketMQ服务端,但并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费,该消息即延时消息。
    消息生产和消费有时间窗口要求,例如在电商交易中超时未支付关闭订单的场景,在订单创建时会发送一条延时消息。这条消息将会在30分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。如支付未完成,则关闭订单。如已完成支付则忽略。通过消息触发一些定时任务,例如在某一固定时间点向用户发送提醒消息。
    定时消息会暂存在名为SCHEDULE_TOPIC_XXXX的topic中,并且会根据
    delayTimeLevel存入特定的queue,queueId = delayTimeLevel –1,即一个queue只存相同延迟的消息,保证具有相同发送延迟的消息能够顺序消费。broker会调度地消费SCHEDULE_TOPIC_XXXX,将消息写入真实的topic。

//
org/apache/rocketmq/store/config/MessageStore
Config.java
private String messageDelayLevel = "1s 5s 10s
30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h
2h";

代码测试
生产者

package com.jjy.produce;import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;public class DelayMessageProducer {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer=new DefaultMQProducer("producer_group");producer.setNamesrvAddr("192.168.66.100:9876");producer.start();Message message=null;for(int i=0; i<20;i++){message=new Message("tp_demo_3",("hello RocketMQ delayMessage -"+i).getBytes());//设置延迟时间0-17 0表示2秒 大于18都是2小时message.setDelayTimeLevel(i);producer.send(message);}producer.shutdown();}
}

消费者

package com.jjy.consumer;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.common.message.MessageExt;public class DelayMessageConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("transactionConsumer-grp");consumer.setNamesrvAddr("192.168.66.100:9876");System.out.println("==========================================");//设置消息重试次数consumer.setMaxReconsumeTimes(5);//设置可以批量处理consumer.setConsumeMessageBatchMaxSize(1);//订阅主题consumer.subscribe("tp_demo_3","*");consumer.setMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {System.out.println(System.currentTimeMillis()/1000);for(MessageExt message:list){System.out.println(message.getTopic()+"\t"+message.getQueueId()+"\t"+message.getDelayTimeLevel()+"\t"+new String(message.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();}
}

运行结果
在这里插入图片描述

消息查询

在这里插入图片描述

在实际开发中,经常需要查看MQ中消息的内容来排查问题。RocketMQ提供了三种消息查询的方式,分别是按Message ID、Message Key以及Unique Key查询。

//返回结果
SendResult [sendStatus=SEND_OK,
msgId=C0A801030D4B18B4AAC247DE4A0D0000,
offsetMsgId=C0A8010300002A9F000000000007BEE9,messageQueue=MessageQueue [topic=TopicA,
brokerName=broker-a, queueId=0],queueOffset=0]
  • 按MessageId查询消息
    Message Id 是消息发送后,在Broker端生成的,其包含了
    Broker的地址、偏移信息,并且会把Message Id作为结果的一
    部分返回。Message Id中属于精确匹配,代表唯一一条消息,
    查询效率更高。

  • 按照Message Key查询消息
    消息的key是开发人员在发送消息之前自行指定的,通常把具有
    业务含义,区分度高的字段作为消息的key,如用户id,订单id
    等。

  • 按照Unique Key查询消息
    除了开发人员指定的消息key,生产者在发送发送消息之前,会
    自动生成一个UNIQ_KEY,设置到消息的属性中,从逻辑上唯一
    代表一条消息

消息在消息队列RocketMQ中存储的时间默认为3天(不建议修
改),即只能查询从消息发送时间算起3天内的消息,三种查询方式
的特点和对比如下表所述:
在这里插入图片描述

二.RocketMQ应用实战

在这里插入图片描述

生产端发送同步消息

同步发送是指消息发送方发出数据后,同步等待,直到收到接收方发回响应之后才发下一个请求
在这里插入图片描述

package com.jjy.produce;import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;import java.io.UnsupportedEncodingException;public class SyncProducer {public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, MQBrokerException, RemotingException, InterruptedException {//实例化消息生产者DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");// 设置NameServer的地址producer.setNamesrvAddr("192.168.66.100:9876");// 启动Producer实例producer.start();for (int i = 0; i < 100; i++) {// 创建消息,并指定Topic,Tag和消息体Message msg = new Message("TopicTest" /* Topic */,"TagA" /* Tag */,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);// 发送消息到一个BrokerSendResult sendResult = producer.send(msg);// 通过sendResult返回消息是否成功送达System.out.printf("%s%n", sendResult);}// 如果不再发送消息,关闭Producer实例。producer.shutdown();}
}

运行结果

SendResult [sendStatus=SEND_OK, msgId=7F0000015A3863947C6B98F2DFEB0000, offsetMsgId=AC12FA2E00002A9F0000000000012E3A, messageQueue=MessageQueue [topic=TopicTest, brokerName=chenl346-vszbn, queueId=1], queueOffset=100]
SendResult [sendStatus=SEND_OK, msgId=7F0000015A3863947C6B98F2E0030001, offsetMsgId=AC12FA2E00002A9F0000000000012EF8, messageQueue=MessageQueue [topic=TopicTest, brokerName=chenl346-vszbn, queueId=2], queueOffset=101]
SendResult [sendStatus=SEND_OK, msgId=7F0000015A3863947C6B98F2E0060002, offsetMsgId=AC12FA2E00002A9F0000000000012FB6, messageQueue=MessageQueue [topic=TopicTest, brokerName=chenl346-vszbn, queueId=3], queueOffset=100]
SendResult [sendStatus=SEND_OK, msgId=7F0000015A3863947C6B98F2E0090003, offsetMsgId=AC12FA2E00002A9F0000000000013074, messageQueue=MessageQueue [topic=TopicTest, brokerName=chenl346-vszbn, queueId=0], queueOffset=99]
... ...

Message ID:消息的全局唯一标识(内部机制的ID生成是使用机器IP和消息偏移量的组成,所以有可能重复,如果是幂等性还是最好考虑Key),由消息队列MQ系统自动生成,唯一标识某条消息。

SendStatus:发送的标识。成功,失败等

Queue:相当于是Topic的分区;用于并行发送和接收消息

发送异步消息

异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。在这里插入图片描述
消息发送方在发送了一条消息后,不等接收方发回响应,接着进行第二条消息发送。发送方通过回调接口的方式接收服务器响应,并对响应结果进行处理

package com.jjy.produce;import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.CountDownLatch2;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import org.apache.rocketmq.remoting.exception.RemotingException;import java.io.UnsupportedEncodingException;
import java.util.concurrent.TimeUnit;public class AsyncProducer {public static void main(String[] args) throws MQClientException, UnsupportedEncodingException, RemotingException, InterruptedException {// 实例化消息生产者ProducerDefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");// 设置NameServer的地址producer.setNamesrvAddr("192.168.66.100:9876");// 启动Producer实例producer.start();//消息失败重试次数producer.setRetryTimesWhenSendAsyncFailed(0);int messageCount = 100;// 根据消息数量实例化倒计时计算器final CountDownLatch2 countDownLatch = new CountDownLatch2(messageCount);//创建消息for(int i=0;i<messageCount;i++){final int index=i;Message msg=new Message("topic_demo","TagA","OrderID188","Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));// SendCallback接收异步返回结果的回调producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {countDownLatch.countDown();System.out.printf("%-10d OK %s %n", index,sendResult.getMsgId());}@Overridepublic void onException(Throwable e) {countDownLatch.countDown();System.out.printf("%-10d Exception %s %n", index, e);e.printStackTrace();}});}// 等待5scountDownLatch.await(5, TimeUnit.SECONDS);// 如果不再发送消息,关闭Producer实例。producer.shutdown();}
}

单向发送消息

这种方式主要用在不特别关心发送结果的场景,例如日志发送
在这里插入图片描述
单向(Oneway)发送特点为发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别

package com.jjy.produce;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;public class OnewayProducer {public static void main(String[] args) throws Exception{// 实例化消息生产者ProducerDefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");// 设置NameServer的地址producer.setNamesrvAddr("192.168.66.100:9876");// 启动Producer实例producer.start();for (int i = 0; i < 100; i++) {// 创建消息,并指定Topic,Tag和消息体Message msg = new Message("TopicTest" /* Topic */,"TagA" /* Tag */,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */);// 发送单向消息,没有任何返回结果producer.sendOneway(msg);}// 如果不再发送消息,关闭Producer实例。producer.shutdown();}
}

消息发送时的权衡

发送方式发送TPS发送结果反馈可靠性使用场景
同步发送可靠邮件、短信、推送
异步发送可靠视频转码
单向发送最快可能丢失日志收集

顺序发送消息

一个订单的顺序流程是:创建、付款、推送、完成。订单号相同的消息会被先后发送到同一个队列中,消费时,同一个OrderId获取到的肯定是同一个队列。

package com.jjy.produce;import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;import java.util.List;public class OrderProducer {public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {DefaultMQProducer producer = new DefaultMQProducer("producer_grp_02");producer.setNamesrvAddr("192.168.66.100:9876");producer.start();// 获取指定主题的MQ列表final List<MessageQueue> messageQueues = producer.fetchPublishMessageQueues("tp_demo_11");Message message = null;MessageQueue messageQueue = null;for (int i = 0; i < 100; i++) {// 采用轮询的方式指定MQ,发送订单消息,保证同一个订单的消息按顺序// 发送到同一个MQmessageQueue = messageQueues.get(i % 8);//创建message对象//发送创建订单消息message = new Message("tp_demo_11", ("hello rocketmq order create - " + i).getBytes());producer.send(message, messageQueue);//发送付款订单消息message = new Message("tp_demo_11", ("hello rocketmq order pay - " + i).getBytes());producer.send(message, messageQueue);//发送订单送货消息message = new Message("tp_demo_11", ("hello rocketmq order delivery - " + i).getBytes());producer.send(message, messageQueue);}producer.shutdown();}
}

消费顺序消息

package com.jjy.consumer;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class OrderConsumer {public static void main(String[] args) throws MQClientException {//创建消费者对象DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_grp_02");consumer.setNamesrvAddr("192.168.66.100:9876");//订阅主题consumer.subscribe("tp_demo_11", "*");//最小消费线程数consumer.setConsumeThreadMin(1);//最大消费线程数consumer.setConsumeThreadMax(1);//一次拉取的消息数量consumer.setPullBatchSize(1);//一次消费的消息数量consumer.setConsumeMessageBatchMaxSize(1);// 使用有序消息监听器consumer.setMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,  ConsumeOrderlyContext context) {for (MessageExt msg : msgs) {System.out.println(msg.getTopic() + "\t" +msg.getQueueId() + "\t" +new String(msg.getBody()));}return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();}
}

全局顺序消息

生产者

package com.jjy.produce;import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.exception.RemotingException;import java.util.List;public class GlobalOrderProducer {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer = new DefaultMQProducer("producer_grp_02");producer.setNamesrvAddr("192.168.66.100:9876");producer.start();Message message = null;for(int i=0;i<100;i++){message=new Message("tp_demo_11",("全局有序消息...."+i).getBytes());producer.send(message, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> list, Message message, Object o) {return list.get((Integer)0);}},1);}producer.shutdown();}
}

消费者

package com.jjy.consumer;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class GlobalConsumer {public static void main(String[] args) throws MQClientException {//创建消费者对象DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_grp_02");consumer.setNamesrvAddr("192.168.66.100:9876");//订阅主题consumer.subscribe("tp_demo_11", "*");//最小消费线程数consumer.setConsumeThreadMin(1);//最大消费线程数consumer.setConsumeThreadMax(1);//一次拉取的消息数量consumer.setPullBatchSize(1);//一次消费的消息数量consumer.setConsumeMessageBatchMaxSize(1);consumer.setMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {for (MessageExt msg : list) {System.out.println("消费线程=" + Thread.currentThread().getName() +", queueId=" + msg.getQueueId() + ", 消息内容:" + new String(msg.getBody()));}return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();}
}

延迟消息

生产者

package com.jjy.produce;import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;public class DelayMessageProducer {public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {DefaultMQProducer producer=new DefaultMQProducer("producer_group");producer.setNamesrvAddr("192.168.66.100:9876");producer.start();Message message=null;for(int i=0; i<20;i++){message=new Message("tp_demo_3",("hello RocketMQ delayMessage -"+i).getBytes());// 设置延迟时间级别0,18,0表示不延迟,18表示延迟2h,大于18的都是2hmessage.setDelayTimeLevel(i);producer.send(message);}producer.shutdown();}
}

消费者

package com.jjy.consumer;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;import org.apache.rocketmq.common.message.MessageExt;public class DelayMessageConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer=new DefaultMQPushConsumer("transactionConsumer-grp");consumer.setNamesrvAddr("192.168.66.100:9876");System.out.println("==========================================");//设置消息重试次数consumer.setMaxReconsumeTimes(5);//设置可以批量处理consumer.setConsumeMessageBatchMaxSize(1);//订阅主题consumer.subscribe("tp_demo_3","*");consumer.setMessageListener((MessageListenerConcurrently) (list, consumeConcurrentlyContext) -> {System.out.println(System.currentTimeMillis()/1000);for(MessageExt message:list){System.out.println(message.getTopic()+"\t"+message.getQueueId()+"\t"+message.getDelayTimeLevel()+"\t"+new String(message.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});consumer.start();}
}

事务消息

生产者

package com.jjy.produce;import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;public class  TransactionProducer {public static void main(String[] args) throws MQClientException {TransactionListener listener=new TransactionListener() {@Overridepublic LocalTransactionState executeLocalTransaction(Message message, Object o) {// 当发送事务消息prepare(half)成功后,调用该方法执行本地事务System.out.println("执行本地事务......");// return LocalTransactionState.COMMIT_MESSAGE;//使用下面事务回滚 消费端无法接收到消息了try {Thread.sleep(100000);} catch (InterruptedException e) {e.printStackTrace();}return LocalTransactionState.ROLLBACK_MESSAGE;}@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt messageExt) {// 该方法用于获取本地事务执行的状态。System.out.println("检查本地事务的状态:" + messageExt);return LocalTransactionState.COMMIT_MESSAGE;}};//创建事务消息生产者TransactionMQProducer producer=new TransactionMQProducer("producer_grp_01");//设置事务监听器producer.setTransactionListener(listener);producer.setNamesrvAddr("192.168.66.100:9876");producer.start();Message message=null;message=new Message("tp_demo_11","hello translation message".getBytes());producer.sendMessageInTransaction(message,"{\" name\":\"zhansan\"}");}
}

消费者

package com.jjy.consumer;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;import java.util.List;public class TransactionMsgConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("txconsumer_grp_12_01");consumer.setNamesrvAddr("192.168.66.100:9876");consumer.subscribe("tp_demo_11", "*");consumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {for (MessageExt msg : msgs) {System.out.println(new String(msg.getBody()));}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();}}

消息查询

package com.itbaizhan.consumer;public class QueryingMessageDemo {public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {//创建消费者对象DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_grp_01");//设置nameserver地址consumer.setNamesrvAddr("192.168.66.100:9876");//设置消息监听器consumer.setMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();//根据messageId查询消息MessageExt message = consumer.viewMessage("topic_springboot_demo_02", "C0A88B8000002A9F000000000000C8E8");System.out.println(message);System.out.println(message.getMsgId());consumer.shutdown();}
}

如果我的内容对你有帮助,请点赞,评论,收藏。创作不易,大家的支持就是我坚持下去的动力!
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/471137.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

不花一分钱,在 Mac 上跑 Windows(M1/M2 版)

这是在 MacOS M1 上体验最新 Windows11 的效果&#xff1a; VMware Fusion&#xff0c;可以运行 Windows、Linux 系统&#xff0c;个人使用 licence 免费 安装流程见 &#x1f449; https://zhuanlan.zhihu.com/p/452412091 从申请 Fusion licence 到下载镜像&#xff0c;再到…

Compose自定义动画API指南

很多动画API都可以自定义其参数达到不同的效果&#xff0c;Compose也提供了相应的API供开发者进行自定义动画规范。 AnimationSpec 主要用存储动画规格&#xff0c;可以自定义动画的行为&#xff0c;在animate*AsState和updateTransition函数中&#xff0c;此函数默认参数为s…

【C++】 为什么多继承子类重写的父类的虚函数地址不同?『 多态调用汇编剖析』

&#x1f440;樊梓慕&#xff1a;个人主页 &#x1f3a5;个人专栏&#xff1a;《C语言》《数据结构》《蓝桥杯试题》《LeetCode刷题笔记》《实训项目》《C》《Linux》《算法》 &#x1f31d;每一个不曾起舞的日子&#xff0c;都是对生命的辜负 前言 本篇文章主要是为了解答有…

《剑指offer》

本专题是分享剑指offer的一些题目&#xff0c;开始刷题计划。 二维数组的中的查找【https://www.nowcoder.com/practice/abc3fe2ce8e146608e868a70efebf62e?tpId13&tqId11154&ru/exam/oj】 描述 在一个二维数组array中&#xff08;每个一维数组的长度相同&#xff0…

关于Django的中间件使用说明。

目录 1.中间件2. 为什么要中间件&#xff1f;3. 具体使用中间件3.1 中间件所在的位置&#xff1a;在django的settings.py里面的MIDDLEWARE。3.2 中间件的创建3.3 中间件的使用 4. 展示成果 1.中间件 中间件的大概解释&#xff1a;在浏览器在请求服务器的时候&#xff0c;首先要…

【JAVA-Day88】Java字符串和JSON对象的转换

Java字符串和JSON对象的转换 Java字符串和JSON对象的转换摘要引言一、什么是JSON二、JSON的应用场景三、JSON对象转字符串3.1 使用 Jackson 库实现 JSON 对象转字符串3.2 使用 Gson 库实现 JSON 对象转字符串 四、JSON字符串转对象4.1 使用 Jackson 库实现 JSON 字符串转对象4.…

Editing While Playing 使用 Easyx 开发的 RPG 地图编辑器 tilemap eaitor

AWSD移动画布 鼠标右键长按拖拽 鼠标左键长按绘制 可以边拖拽边移动画布边绘制。 F1 导出 DLC F2 导入DLC author: 民用级脑的研发记录 1309602336qq.com 开发环境&#xff1a; 内置 easyx 的 devc 5.11 或者 VS 2022 TDM GCC 4.9.2 64-bit c11及以上都可运行 windows 环境运行…

通过深度学习和人脸图像进行年龄段估计matlab仿真

目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 4.1深度学习网络 4.2 人脸特征提取 4.3 回归模型构建 5.算法完整程序工程 1.算法运行效果图预览 2.算法运行软件版本 MATLAB2022a 3.部分核心程序 ..................................…

平时积累的FPGA知识点(8)

平时在FPGA群聊等积累的FPGA知识点&#xff0c;第八期&#xff1a; 21 FFT IP核 有遇到过FFT IP核测量频率不准确的问题吗&#xff1f;大部分情况下都是准的&#xff0c;偶尔偏差比较大&#xff0c;IP核输入的数据用matlab计算出的频率是对的。 解释&#xff1a;可能是采样点…

docker (三)-开箱即用常用命令

一 docker架构 拉取镜像仓库中的镜像到本地&#xff0c;镜像运行产生一个容器 registry 镜像仓库 registry可以理解为镜像仓库&#xff0c;用于保存docker image。 Docker Hub 是docker官方的镜像仓库&#xff0c;docker命令默认从docker hub中拉取镜像。我们也可以搭建自己…

数据结构——6.1 图的基本概念

第六章 图 6.1 图的基本概念 概念 图的概念&#xff1a;G由点集V和边集E构成&#xff0c;记为G(V,E)&#xff0c;边集可以为空&#xff0c;但是点集不能为空 注意&#xff1a;线性表可以是空表&#xff0c;树可以是空树&#xff0c;但图不可以是空&#xff0c;即V一定是非空集…

Pr教程1-8节笔记

第一课 认识PR以及PR的学习方法 学习任务&#xff1a; 1、熟练掌握PR软件&#xff0c;同时掌握剪辑技术以及常用于制作特效的效果器。 2、认识PR软件的名称、主要功能以及用途作用。 3、明白学习PR我们能做些什么以及PR的学习方法。 知识内容&#xff1a; 1、PR是专门用于视…