RocketMQ Learning(二)

目录

一、RocketMQ

1、顺序消息

2、延时消息

3、批量消息

批量切分发送

4、消息的过滤

Tag过滤

Sql过滤

5、分布式事务消息

6、Request-Reply消息

7、死信队列


一、RocketMQ

1、顺序消息

        消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。区别如下:
        生产消息时在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。

全局有序

全局有序比较简单,主要控制在于创建Topic指定只有一个队列,同步确保生产者与消费者都只有一个实例进行即可。

 分区有序

生产者-->源码example模块下的ordermessage下

package org.apache.rocketmq.example.ordermessage;import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;import java.util.List;public class Producer {public static void main(String[] args) throws MQClientException {try {DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");producer.start();String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};for (int i = 0; i < 100; i++) {int orderId = i % 10;Message msg =new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer) arg;int index = id % mqs.size();return mqs.get(index);}}, orderId);System.out.printf("%s%n", sendResult);}producer.shutdown();} catch (Exception e) {e.printStackTrace();throw new MQClientException(e.getMessage(), null);}}
}

 消费者

package org.apache.rocketmq.example.ordermessage;import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;public class Consumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe("TopicTest", "TagA || TagC || TagD");consumer.registerMessageListener(new MessageListenerOrderly() {AtomicLong consumeTimes = new AtomicLong(0);@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {context.setAutoCommit(true);System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);this.consumeTimes.incrementAndGet();if ((this.consumeTimes.get() % 2) == 0) {return ConsumeOrderlyStatus.SUCCESS;} else if ((this.consumeTimes.get() % 5) == 0) {context.setSuspendCurrentQueueTimeMillis(3000);return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.out.printf("Consumer Started.%n");}}

2、延时消息

        延时消息:Producer 将消息发送到消息队列 RocketMQ 服务端,但并不期望这条消息立马投递(被消费者消费),而是延迟一定时间后才投递到 Consumer 进行消费,该消息即延时消息。

        适用场景:消息生产和消费有时间窗口要求:比如在电商交易中超时未支付关闭订单的场景,在订单创建时向RocketMQ发送一条延时消息。这条消息将会在 30 分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。 如支付未完成,则关闭订单。如已完成支付则忽略。

        延迟消息的level,区分18个等级:level为1,表示延迟1秒后消费;level为2表示延迟5秒后消费;level为3表示延迟10秒后消费;以此类推;最大level为18表示延迟2个小时消费。具体标识如下:

level

1

2

3

4

5

6

7

8

9

延迟

1s

5s

10s

30s

1m

2m

3m

4m

5m

level

10

11

12

13

14

15

16

17

18

延迟

6m

7m

8m

9m

10m

20m

30m

1h

2h

生产消息跟普通的生产消息类似,只需要在消息上设置延迟队列的level即可。消费消息跟普通的消费消息一致。

生产者

package org.apache.rocketmq.example.schedule;import java.nio.charset.StandardCharsets;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;public class ScheduledMessageProducer {public static final String PRODUCER_GROUP = "ExampleProducerGroup";public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";public static final String TOPIC = "TestTopic";public static void main(String[] args) throws Exception {// Instantiate a producer to send scheduled messagesDefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);//设置NameServ的地址producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);// Launch producerproducer.start();int totalMessagesToSend = 10;for (int i = 0; i < totalMessagesToSend; i++) {Message message = new Message(TOPIC, ("Hello scheduled message " + i).getBytes(StandardCharsets.UTF_8));// This message will be delivered to consumer 10 seconds later.message.setDelayTimeLevel(3);// Send the messageSendResult result = producer.send(message);System.out.print(result);}// Shutdown producer after use.producer.shutdown();}}

消费者

package org.apache.rocketmq.example.schedule;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;public class ScheduledMessageConsumer {public static final String CONSUMER_GROUP = "ExampleConsumer";public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";public static final String TOPIC = "TestTopic";public static void main(String[] args) throws Exception {// Instantiate message consumerDefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);// Uncomment the following line while debugging, namesrvAddr should be set to your local addressconsumer.setNamesrvAddr(DEFAULT_NAMESRVADDR);// Subscribe topicsconsumer.subscribe(TOPIC, "*");// Register message listenerconsumer.registerMessageListener((MessageListenerConcurrently) (messages, context) -> {for (MessageExt message : messages) {// Print approximate delay time period//打印消息消费延迟System.out.printf("Receive message[msgId=%s %d  ms later]\n", message.getMsgId(),System.currentTimeMillis() - message.getStoreTimestamp());}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});// Launch consumerconsumer.start();//info:to see the time effect, run the consumer first , it will wait for the msg//then start the producer}
}

查看消费者消息信息

Receive message[msgId=C0A800AB351018B4AAC2293131720003 500  ms later]
Receive message[msgId=C0A800AB351018B4AAC22931315C0000 501  ms later]
Receive message[msgId=C0A800AB351018B4AAC22931316F0002 500  ms later]
Receive message[msgId=C0A800AB351018B4AAC2293131680001 500  ms later]
Receive message[msgId=C0A800AB351018B4AAC22931317B0007 500  ms later]
Receive message[msgId=C0A800AB351018B4AAC2293131790006 500  ms later]
Receive message[msgId=C0A800AB351018B4AAC2293131770005 500  ms later]
Receive message[msgId=C0A800AB351018B4AAC22931317F0009 500  ms later]
Receive message[msgId=C0A800AB351018B4AAC2293131740004 500  ms later]
Receive message[msgId=C0A800AB351018B4AAC22931317D0008 500  ms later]

3、批量消息

        在高并发场景中,批量发送消息能显著提高传递消息发送时的性能(减少网络连接及IO的开销)。使用批量消息时的限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK(集群时会细讲),且不能是延时消息。
        在发送批量消息时先构建一个消息对象集合,然后调用send(Collection msg)系列的方法即可。由于批量消息的4MB限制,所以一般情况下在集合中添加消息需要先计算当前集合中消息对象的大小是否超过限制,如果超过限制也可以使用分割消息的方式进行多次批量发送。

        因为批量消息是一个Collection,所以送入消息可以是List,也可以使Set,这里为方便起见,使用List进行批量组装发送。

package org.apache.rocketmq.example.batch;import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;public class SimpleBatchProducer {public static final String PRODUCER_GROUP = "BatchProducerGroupName";public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";public static final String TOPIC = "BatchTest";public static final String TAG = "Tag";public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);// Uncomment the following line while debugging, namesrvAddr should be set to your local addressproducer.setNamesrvAddr(DEFAULT_NAMESRVADDR);producer.start();//If you just send messages of no more than 1MiB at a time, it is easy to use batch//Messages of the same batch should have: same topic, same waitStoreMsgOK and no schedule supportList<Message> messages = new ArrayList<>();messages.add(new Message(TOPIC, TAG, "OrderID001", "Hello world 0".getBytes(StandardCharsets.UTF_8)));messages.add(new Message(TOPIC, TAG, "OrderID002", "Hello world 1".getBytes(StandardCharsets.UTF_8)));messages.add(new Message(TOPIC, TAG, "OrderID003", "Hello world 2".getBytes(StandardCharsets.UTF_8)));SendResult sendResult = producer.send(messages);System.out.printf("%s", sendResult);}
}

批量切分发送

        如果消息的总长度可能大于4MB时,这时候最好把消息进行分割,案例中以1M大小进行消息分割。
        我们需要发送10万元素的数组,这个量很大,怎么快速发送完。使用批量发送,同时每一批控制在1M左右确保不超过消息大小限制。

package org.apache.rocketmq.example.batch;import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;public class SplitBatchProducer {public static final String PRODUCER_GROUP = "BatchProducerGroupName";public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";public static final int MESSAGE_COUNT = 100 * 1000;public static final String TOPIC = "BatchTest";public static final String TAG = "Tag";public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);// Uncomment the following line while debugging, namesrvAddr should be set to your local addressproducer.setNamesrvAddr(DEFAULT_NAMESRVADDR);producer.start();//large batchList<Message> messages = new ArrayList<>(MESSAGE_COUNT);for (int i = 0; i < MESSAGE_COUNT; i++) {messages.add(new Message(TOPIC, TAG, "OrderID" + i, ("Hello world " + i).getBytes(StandardCharsets.UTF_8)));}//split the large batch into small ones:ListSplitter splitter = new ListSplitter(messages);while (splitter.hasNext()) {List<Message> listItem = splitter.next();SendResult sendResult = producer.send(listItem);System.out.printf("%s", sendResult);}}}class ListSplitter implements Iterator<List<Message>> {private static final int SIZE_LIMIT = 1000 * 1000;private final List<Message> messages;private int currIndex;public ListSplitter(List<Message> messages) {this.messages = messages;}@Overridepublic boolean hasNext() {return currIndex < messages.size();}@Overridepublic List<Message> next() {int nextIndex = currIndex;int totalSize = 0;for (; nextIndex < messages.size(); nextIndex++) {Message message = messages.get(nextIndex);int tmpSize = message.getTopic().length() + message.getBody().length;Map<String, String> properties = message.getProperties();for (Map.Entry<String, String> entry : properties.entrySet()) {tmpSize += entry.getKey().length() + entry.getValue().length();}//for log overheadtmpSize = tmpSize + 20;if (tmpSize > SIZE_LIMIT) {//it is unexpected that single message exceeds the sizeLimit//here just let it go, otherwise it will block the splitting processif (nextIndex - currIndex == 0) {//if the next sublist has no element, add this one and then break, otherwise just breaknextIndex++;}break;}if (tmpSize + totalSize > SIZE_LIMIT) {break;} else {totalSize += tmpSize;}}List<Message> subList = messages.subList(currIndex, nextIndex);currIndex = nextIndex;return subList;}@Overridepublic void remove() {throw new UnsupportedOperationException("Not allowed to remove");}
}

4、消息的过滤

        在实际的开发应用中,对于一类消息尽可能使用一个Topic进行存储,但在消费时需要选择您想要的消息,这时可以使用RocketMQ的消息过滤功能,具体实现是利用消息的Tag和Key。
        Key一般用于消息在业务层面的唯一标识。对发送的消息设置好 Key,以后可以根据这个 Key 来查找消息。比如消息异常,消息丢失,进行查找会很方便。RocketMQ 会创建专门的索引文件,用来存储 Key与消息的映射,由于底层实现是 Hash 索引,应尽量使 Key唯一,避免潜在的哈希冲突。
        Tag可以理解为是二级分类。以淘宝交易平台为例,订单消息和支付消息属于不同业务类型的消息,分别创建OrderTopic 和PayTopic,其中订单消息根据不同的商品品类以不同的 Tag 再进行细分,如手机类、家电类、男装类、女装类、化妆品类,最后它们都被各个不同的系统所接收。通过合理的使用 Topic 和 Tag,可以让业务结构清晰,更可以提高效率。
        Key和Tag的主要差别是使用场景不同,Key主要用于通过命令行命令查询消息,而Tag用于在消息端的代码中,用来进行服务端消息过滤。

Tag过滤

使用Tag过滤的方式是在消息生产时传入感兴趣的Tag标签,然后在消费端就可以根据Tag来选择您想要的消息。具体的操作是在创建Message的时候添加,一个Message只能有一个Tag。

package org.apache.rocketmq.example.filter;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;public class TagFilterProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");producer.setNamesrvAddr("127.0.0.1:9876");producer.start();String[] tags = new String[] {"TagA", "TagB", "TagC"};for (int i = 0; i < 60; i++) {Message msg = new Message("TagFilterTest",tags[i % tags.length],"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);}producer.shutdown();}
}

消费者消费时,只选择TagA和TagC的消息。

package org.apache.rocketmq.example.filter;import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;public class TagFilterConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");consumer.setNamesrvAddr("127.0.0.1:9876");consumer.subscribe("TagFilterTest", "TagA || TagC");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf("Consumer Started.%n");}
}

注意事项

Tag过滤的形式非常简单,||代表或、*代表所有,所以使用Tag过滤这对于复杂的场景可能不起作用。在这种情况下,可以使用SQL表达式筛选消息。

Sql过滤

SQL特性可以通过发送消息时的属性来进行消息的过滤计算。具体的操作是使用SQL92标准的sql语句,前提是只有使用push模式的消费者才能用(消费的模式就是push)

SQL基本语法

数值比较:比如:>,>=,<,<=,BETWEEN,=;

字符比较:比如:=,<>,IN;

IS NULL 或者 IS NOT NULL;

逻辑符号:AND,OR,NOT;

常量支持类型为:

数值,比如:123,3.1415;

字符,比如:'abc',必须用单引号包裹起来;

NULL,特殊的常量

布尔值,TRUE 或 FALSE

注意事项

        Sql过滤需要Broker开启这项功能(如果消费时使用SQL过滤抛出异常错误,说明Sql92功能没有开启),需要修改Broker.conf配置文件。加入enablePropertyFilter=true 然后重启Broker服务。

package org.apache.rocketmq.example.filter;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;public class SqlFilterProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");producer.setNamesrvAddr("127.0.0.1:9876");producer.start();String[] tags = new String[] {"TagA", "TagB", "TagC"};for (int i = 0; i < 10; i++) {Message msg = new Message("SqlFilterTest",tags[i % tags.length],("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));//设置sql过滤的属性msg.putUserProperty("a", String.valueOf(i));SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);}producer.shutdown();}
}

用MessageSelector.bySql来使用sql筛选消息

package org.apache.rocketmq.example.filter;import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;public class SqlFilterConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");consumer.setNamesrvAddr("127.0.0.1:9876");// Don't forget to set enablePropertyFilter=true in brokerconsumer.subscribe("SqlFilterTest",MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +"and (a is not null and a between 0 and 3)"));consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf("Consumer Started.%n");}
}

5、分布式事务消息

业务场景:用户A转账100元给用户B,这个业务比较简单,具体的步骤:
1、用户A的账户先扣除100元 
2、再把用户B的账户加100元

RocketMq把消息分为两个阶段:半事务阶段和确认阶段
半事务阶段:
该阶段主要发一个消息到rocketmq,但该消息只储存在commitlog中,但consumeQueue中不可见,也就是消费端(订阅端)无法看到此消息
确认阶段(commit/rollback):
该阶段主要是把半事务消息保存到consumeQueue中,即让消费端可以看到此消息,也就是可以消费此消息。如果是rollback就不保存。

 整个流程:
    1、A在扣款之前,先发送半事务消息
    2、发送预备消息成功后,执行本地扣款事务
    3、扣款成功后,再发送确认消息
    4、B消息端(加钱业务)可以看到确认消息,消费此消息,进行加钱
注意:上面的确认消息可以为commit消息,可以被订阅者消费;也可以是Rollback消息,即执行本地扣款事务失败后,提交rollback消息,即删除那个半事务消息,订阅者无法消费。这样就可以解决以下异常问题:
    异常1:如果发送半事务消息失败,下面的流程不会走下去,这个是正常的。
    异常2:如果发送半事务消息成功,但执行本地事务失败。这个也没有问题,因为此半事务消息不会被消费端订阅到,消费端不会执行业务。 
    异常3:如果发送半事务消息成功,执行本地事务成功,但发送确认消息失败;这个就有问题了,因为用户A扣款成功了,但加钱业务没有订阅到确认消息,无法加钱。这里出现了数据不一致。

        RocketMq如何解决上面的问题,核心思路就是【事务回查】,也就是RocketMq会定时遍历commitlog中的半事务消息。
        对于异常3,发送半事务消息成功,本地扣款事务成功,但发送确认消息失败;因为RocketMq会进行回查半事务消息,在回查后发现业务已经扣款成功了,就补发“发送commit确认消息”;这样加钱业务就可以订阅此消息了。
        这个思路其实把异常2也解决了,如果本地事务没有执行成功,RocketMQ回查业务,发现没有执行成功,就会发送RollBack确认消息,把消息进行删除。
        同时还要注意的点是,RocketMQ不能保障消息的重复,所以在消费端一定要做幂等性处理。
除此之外,如果消费端发生消费失败,同时也需要做重试,如果重试多次,消息会进入死信队列,这个时候也需要进行特殊的处理。(一般就是把A已经处理完的业务进行回退)

package org.apache.rocketmq.example.transaction;import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;import java.io.UnsupportedEncodingException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;public class TransactionProducer {public static final String PRODUCER_GROUP = "please_rename_unique_group_name";public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";public static final String TOPIC = "TopicTest1234";public static final int MESSAGE_COUNT = 10;public static void main(String[] args) throws MQClientException, InterruptedException {TransactionListener transactionListener = new TransactionListenerImpl();//创建事务监听器TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP);producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);//创建线程池ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), r -> {Thread thread = new Thread(r);thread.setName("client-transaction-msg-check-thread");return thread;});producer.setExecutorService(executorService);//设置生产者回查线程池producer.setTransactionListener(transactionListener);//生产者设置事务回查监听器producer.start();//启动消息生产者String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};for (int i = 0; i < MESSAGE_COUNT; i++) {try {Message msg =new Message(TOPIC, tags[i % tags.length], "KEY" + i,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.sendMessageInTransaction(msg, null);System.out.printf("%s%n", sendResult.getSendStatus());//半事务消息是否成功Thread.sleep(10);} catch (MQClientException | UnsupportedEncodingException e) {e.printStackTrace();}}for (int i = 0; i < 100000; i++) {//等待Thread.sleep(1000);}producer.shutdown();}
}

事务回查案例代码

package org.apache.rocketmq.example.transaction;import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;public class TransactionListenerImpl implements TransactionListener {private AtomicInteger transactionIndex = new AtomicInteger(0);private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();//执行本地事务@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {int value = transactionIndex.getAndIncrement();int status = value % 3;localTrans.put(msg.getTransactionId(), status);return LocalTransactionState.UNKNOW;}//事务回查,默认60秒检查一次@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {Integer status = localTrans.get(msg.getTransactionId());if (null != status) {switch (status) {case 0:return LocalTransactionState.UNKNOW;case 1:return LocalTransactionState.COMMIT_MESSAGE;case 2:return LocalTransactionState.ROLLBACK_MESSAGE;default:return LocalTransactionState.COMMIT_MESSAGE;}}return LocalTransactionState.COMMIT_MESSAGE;}
}

1、事务消息不支持延时消息和批量消息。
2、事务回查的间隔时间:BrokerConfig. transactionCheckInterval  通过Broker的配置文件设置好。
3、为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionCheckListener 类来修改这个行为。、
4、事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionMsgTimeout 参数。
5、事务性消息可能不止一次被检查或消费。
6、事务性消息中用到了生产者群组,这种就是一种高可用机制,用来确保事务消息的可靠性。
7、提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
8、事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。

6、Request-Reply消息

什么是Request-Reply?
RocketMQ 中"Request-Reply"模式允许Producer发出消息后,以同步或异步的形式等Consumer消费并返回一个响应消息,达到类似RPC的调用过程。
RocketMQ从4.6.0版本开始支持这种模式。这种模式的流程如下图:

 与RPC的不同
RocketMQ的这种调用方式跟dubbo之类的RPC调用非常类似,那为什么不使用dubbo?而要使用RocketMQ的这种RPC调用呢?原因如下:
  基于RocketMQ来实现RPC可以快速搭建服务的消息总线,实现自己的RPC框架。
  基于RocketMQ来实现RPC可以方便的收集调用的相关信息,能够实现调用链路追踪和分析。
  基于RocketMQ来实现RPC既可以解耦两个系统之间的依赖,也可以实现跨网络区域实现系统间的同步调用,这里RocketMQ扮演的是一个类似于网关的角色。

Request-Reply的实现逻辑

        在以上图中,可以看到,使用RPC模式还是三方:Producer、Broker、Consumer。
        在Producer中进行消息的发送时,可以随便指定Topic,但是需要送入reply_to_client、correlation_id两个关键信息,reply_to_client记录着请求方的clientlD(用于Broker响应时确定client端)。而correlation_id是标识每次请求的,用于响应消息与请求的配对。而在进行发送消息时,也有两种模式,一种同步阻塞,另外一种异步非阻塞,这些跟之前普通消息的三种发送方式类似。
        Broker端除了Producer发送时指定的Topic之外,还有一个Reply_Topic,这个以集群名_REPLY_TOPIC命名(不管RPC生产者主题有多少,这个在一个集群中只有一个),主要用于Consumer响应RPC消息的路由发现。
        Consumer端除了消费监听之外,还需要加入一个消息的生产(用于RPC的响应消息),必须使用客户端提供的MessageUtil进行消息的包装,防止关键信息丢失从而导致Producer不能收到RPC消息响应。

package org.apache.rocketmq.example.rpc;import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;public class RequestProducer {public static void main(String[] args) throws MQClientException, InterruptedException {String producerGroup = "please_rename_unique_group_name";String topic = "RequestTopic";long ttl = 3000;//超时时间DefaultMQProducer producer = new DefaultMQProducer(producerGroup);producer.setNamesrvAddr("127.0.0.1:9876");producer.start();try {Message msg = new Message(topic,"","Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));long begin = System.currentTimeMillis();Message retMsg = producer.request(msg, ttl);long cost = System.currentTimeMillis() - begin;System.out.printf("request to <%s> cost: %d replyMessage: %s %n", topic, cost, retMsg);} catch (Exception e) {e.printStackTrace();}producer.shutdown();}
}

package org.apache.rocketmq.example.rpc;import java.nio.charset.StandardCharsets;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.utils.MessageUtil;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.exception.RemotingException;public class ResponseConsumer {public static void main(String[] args) throws InterruptedException, MQClientException {String producerGroup = "please_rename_unique_group_name";String consumerGroup = "please_rename_unique_group_name";String topic = "RequestTopic";// create a producer to send reply messageDefaultMQProducer replyProducer = new DefaultMQProducer(producerGroup);replyProducer.setNamesrvAddr("127.0.0.1:9876");replyProducer.start();// create consumerDefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);consumer.setNamesrvAddr("127.0.0.1:9876");// recommend client configsconsumer.setPullTimeDelayMillsWhenException(0L);consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);for (MessageExt msg : msgs) {try {System.out.printf("handle message: %s %n", msg.toString());String replyTo = MessageUtil.getReplyToClient(msg);byte[] replyContent = "reply message contents.".getBytes(StandardCharsets.UTF_8);// create reply message with given util, do not create reply message by yourselfMessage replyMessage = MessageUtil.createReplyMessage(msg, replyContent);// send reply message with producerSendResult replyResult = replyProducer.send(replyMessage, 3000);System.out.printf("reply to %s , %s %n", replyTo, replyResult.toString());} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {e.printStackTrace();}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.subscribe(topic, "*");consumer.start();System.out.printf("Consumer Started.%n");}
}

7、死信队列

        当一条消息初次消费失败,消息队列 RocketMQ 会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 RocketMQ 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。
        在消息队列 RocketMQ 中,这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。

死信特性
死信消息具有以下特性:
        不会再被消费者正常消费。
        有效期与正常消息相同,均为 3 天,3 天后会被自动删除。因此,请在死信消息产生后的 3 天内及时处理。
死信队列具有以下特性:
        不会再被消费者正常消费。
        一个死信队列对应一个 Group ID, 而不是对应单个消费者实例。
        如果一个 Group ID 未产生死信消息,消息队列 RocketMQ 不会为其创建相应的死信队列。
        一个死信队列包含了对应 Group ID 产生的所有死信消息,不论该消息属于哪个 Topic。

RocketMQ Learning(一)

不是你觉的悟到的东西给了你,你也接不住!

干我们这行,啥时候懈怠,就意味着长进的停止,长进的停止就意味着被淘汰,只能往前冲,直到凤凰涅槃的一天!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/68457.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

自定义注解(Annontation)

目录 1.注解定义 2.元注解定义 3. 自定义注解&#xff08;自定义的注解名称相同的会覆盖原注解&#xff09; 4.Annotation架构&#xff08;元注解参数介绍&#xff09; 1.注解定义 注解是用来将任何的信息或元数据&#xff08;metadata&#xff09;与程序元素&#xff08;类…

ubuntu 如何命令行打开系统设置(Wifi,网络,应用程序...)

关于GNOME GNOME 是一个自由、开放源代码的桌面环境&#xff0c;它运行在 Linux 和其他类 UNIX 操作系统上。它是 GNU 项目的一部分&#xff0c;旨在为 Linux 操作系统提供一个现代化、易于使用的用户界面。 GNOME 桌面环境包括许多应用程序&#xff0c;例如文件管理器、文本编…

Redis数据结构——链表list

链表是一种常用的数据结构&#xff0c;提供了顺序访问的方式&#xff0c;而且高效地增删操作。 Redis中广泛使用了链表&#xff0c;例如&#xff1a;列表的底层实现之一就是链表。 在Redis中&#xff0c;链表分为两部分&#xff1a;链表信息 链表节点。 链表节点用来表示链表…

git 配置网络代理

提高 git 访问 github 速度 网络代理前提: 请开启代理&#xff08;梯子&#xff09;检查代理端口&#xff08;可能会有所不同&#xff09; 文章目录 1. git 配置参数列表命令&#xff1a;2. git 添加 http 代理3. git 取消 http 代理 1. git 配置参数列表命令&#xff1a; gi…

轻量级 Spring Task 任务调度可视化管理

Spring Task/Spring Scheduler 傻傻分不清 首先做一下“名词解释”&#xff0c;分清楚这两者的区别&#xff1a; Spring Task Spring Task 是 Spring 框架自带的一个任务调度模块&#xff0c;提供了基本的任务调度功能。它是通过 Java 的 Timer 和 TimerTask 类来实现的&…

2023年游戏买量能怎么玩?

疫情过后&#xff0c;一地鸡毛。游戏行业的日子也不好过。来看看移动游戏收入&#xff1a;2022年&#xff0c;移动游戏收入达到920亿美元&#xff0c;同比下降6.4%。这告诉我们&#xff0c;2022年对移动游戏市场来说是一个小挫折。 但不管是下挫还是上升&#xff0c;移动游戏市…

Martin_DHCP_V3.0 (DHCP自动化泛洪攻击GUI)

Github>https://github.com/MartinxMax/Martin_DHCP_V3.0 首页 Martin_DHCP_V3.0 自动化DHCP洪泛攻击 Martin_DHCP_V3.0 使用方法 安装三方库 #python3 1.RunMe_Install_Packet.py 攻击路由器 #python3 Martin_DHCP_Attack.py 填写网卡 填写攻击次数 开始运行

ASPICE学习笔记

文章目录 1. ASPICE是什么?2. ASPICE能干什么?2.1 过程参考模型2.2 过程评估模型参考1. ASPICE是什么? ASPICE的全称是Automotive SPICE。很明显的看出ASPICE是由SPICE发展而来。而SPICE是由国际标准化组织ISO、国际电工委员会IEC、信息技术委员会JTC1发起制定的ISO15504标…

轻装上阵,不调用jar包,用C#写SM4加密算法【卸载IKVM 】

前言 记得之前写了一个文章,是关于java和c#加密不一致导致需要使用ikvm的方式来进行数据加密,主要是ikvm把打包后的jar包打成dll包,然后Nuget引入ikvm,从而实现算法的统一,这几天闲来无事,网上找了一下加密库【BouncyCastle.dll】进行加密,目的是想统一加密。因为ikvm相…

HTML详解连载(5)

HTML详解连载&#xff08;5&#xff09; 专栏链接 [link](http://t.csdn.cn/xF0H3)下面进行专栏介绍 开始喽行高&#xff1a;设置多行文本的间距属性名属性值行高的测量方法 行高-垂直居中技巧 字体族属性名属性值示例扩展 font 复合属性使用场景复合属性示例注意 文本缩进属性…

广告聚合平台能为APP开发者提供哪些帮助

应用变现平台是帮助开发者优化广告策略并最终获得更多收入的综合途径。在广告变现过程中&#xff0c;接入单一的广告联盟&#xff0c;变现效率不高&#xff0c;并且开发者需要花费许多精力进行筛选和管理&#xff0c;难免会应接不暇&#xff0c;而聚合广告平台的出现则一定程度…

一百五十二、Kettle——Kettle9.3.0本地连接Hive3.1.2(踩坑,亲测有效,附截图)

一、目的 由于先前使用的kettle8.2版本在Linux上安装后&#xff0c;创建共享资源库点击connect时页面为空&#xff0c;后来采用如下方法&#xff0c;在/opt/install/data-integration/ui/menubar.xul文件里添加如下代码 <menuitem id"file-openZiyuanku" label&…