目录
一、RocketMQ
1、顺序消息
2、延时消息
3、批量消息
批量切分发送
4、消息的过滤
Tag过滤
Sql过滤
5、分布式事务消息
6、Request-Reply消息
7、死信队列
一、RocketMQ
1、顺序消息
消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为分区有序或者全局有序。区别如下:
生产消息时在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
全局有序
全局有序比较简单,主要控制在于创建Topic指定只有一个队列,同步确保生产者与消费者都只有一个实例进行即可。
分区有序
生产者-->源码example模块下的ordermessage下
package org.apache.rocketmq.example.ordermessage;import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.MessageQueueSelector;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.remoting.common.RemotingHelper;import java.util.List;public class Producer {public static void main(String[] args) throws MQClientException {try {DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");producer.start();String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};for (int i = 0; i < 100; i++) {int orderId = i % 10;Message msg =new Message("TopicTestjjj", tags[i % tags.length], "KEY" + i,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer id = (Integer) arg;int index = id % mqs.size();return mqs.get(index);}}, orderId);System.out.printf("%s%n", sendResult);}producer.shutdown();} catch (Exception e) {e.printStackTrace();throw new MQClientException(e.getMessage(), null);}}
}
消费者
package org.apache.rocketmq.example.ordermessage;import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;public class Consumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_3");consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);consumer.subscribe("TopicTest", "TagA || TagC || TagD");consumer.registerMessageListener(new MessageListenerOrderly() {AtomicLong consumeTimes = new AtomicLong(0);@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {context.setAutoCommit(true);System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);this.consumeTimes.incrementAndGet();if ((this.consumeTimes.get() % 2) == 0) {return ConsumeOrderlyStatus.SUCCESS;} else if ((this.consumeTimes.get() % 5) == 0) {context.setSuspendCurrentQueueTimeMillis(3000);return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;}return ConsumeOrderlyStatus.SUCCESS;}});consumer.start();System.out.printf("Consumer Started.%n");}}
2、延时消息
延时消息:Producer 将消息发送到消息队列 RocketMQ 服务端,但并不期望这条消息立马投递(被消费者消费),而是延迟一定时间后才投递到 Consumer 进行消费,该消息即延时消息。
适用场景:消息生产和消费有时间窗口要求:比如在电商交易中超时未支付关闭订单的场景,在订单创建时向RocketMQ发送一条延时消息。这条消息将会在 30 分钟以后投递给消费者,消费者收到此消息后需要判断对应的订单是否已完成支付。 如支付未完成,则关闭订单。如已完成支付则忽略。
延迟消息的level,区分18个等级:level为1,表示延迟1秒后消费;level为2表示延迟5秒后消费;level为3表示延迟10秒后消费;以此类推;最大level为18表示延迟2个小时消费。具体标识如下:
level | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 |
延迟 | 1s | 5s | 10s | 30s | 1m | 2m | 3m | 4m | 5m |
level | 10 | 11 | 12 | 13 | 14 | 15 | 16 | 17 | 18 |
延迟 | 6m | 7m | 8m | 9m | 10m | 20m | 30m | 1h | 2h |
生产消息跟普通的生产消息类似,只需要在消息上设置延迟队列的level即可。消费消息跟普通的消费消息一致。
生产者
package org.apache.rocketmq.example.schedule;import java.nio.charset.StandardCharsets;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;public class ScheduledMessageProducer {public static final String PRODUCER_GROUP = "ExampleProducerGroup";public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";public static final String TOPIC = "TestTopic";public static void main(String[] args) throws Exception {// Instantiate a producer to send scheduled messagesDefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);//设置NameServ的地址producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);// Launch producerproducer.start();int totalMessagesToSend = 10;for (int i = 0; i < totalMessagesToSend; i++) {Message message = new Message(TOPIC, ("Hello scheduled message " + i).getBytes(StandardCharsets.UTF_8));// This message will be delivered to consumer 10 seconds later.message.setDelayTimeLevel(3);// Send the messageSendResult result = producer.send(message);System.out.print(result);}// Shutdown producer after use.producer.shutdown();}}
消费者
package org.apache.rocketmq.example.schedule;import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;public class ScheduledMessageConsumer {public static final String CONSUMER_GROUP = "ExampleConsumer";public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";public static final String TOPIC = "TestTopic";public static void main(String[] args) throws Exception {// Instantiate message consumerDefaultMQPushConsumer consumer = new DefaultMQPushConsumer(CONSUMER_GROUP);// Uncomment the following line while debugging, namesrvAddr should be set to your local addressconsumer.setNamesrvAddr(DEFAULT_NAMESRVADDR);// Subscribe topicsconsumer.subscribe(TOPIC, "*");// Register message listenerconsumer.registerMessageListener((MessageListenerConcurrently) (messages, context) -> {for (MessageExt message : messages) {// Print approximate delay time period//打印消息消费延迟System.out.printf("Receive message[msgId=%s %d ms later]\n", message.getMsgId(),System.currentTimeMillis() - message.getStoreTimestamp());}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;});// Launch consumerconsumer.start();//info:to see the time effect, run the consumer first , it will wait for the msg//then start the producer}
}
查看消费者消息信息
Receive message[msgId=C0A800AB351018B4AAC2293131720003 500 ms later]
Receive message[msgId=C0A800AB351018B4AAC22931315C0000 501 ms later]
Receive message[msgId=C0A800AB351018B4AAC22931316F0002 500 ms later]
Receive message[msgId=C0A800AB351018B4AAC2293131680001 500 ms later]
Receive message[msgId=C0A800AB351018B4AAC22931317B0007 500 ms later]
Receive message[msgId=C0A800AB351018B4AAC2293131790006 500 ms later]
Receive message[msgId=C0A800AB351018B4AAC2293131770005 500 ms later]
Receive message[msgId=C0A800AB351018B4AAC22931317F0009 500 ms later]
Receive message[msgId=C0A800AB351018B4AAC2293131740004 500 ms later]
Receive message[msgId=C0A800AB351018B4AAC22931317D0008 500 ms later]
3、批量消息
在高并发场景中,批量发送消息能显著提高传递消息发送时的性能(减少网络连接及IO的开销)。使用批量消息时的限制是这些批量消息应该有相同的topic,相同的waitStoreMsgOK(集群时会细讲),且不能是延时消息。
在发送批量消息时先构建一个消息对象集合,然后调用send(Collection msg)系列的方法即可。由于批量消息的4MB限制,所以一般情况下在集合中添加消息需要先计算当前集合中消息对象的大小是否超过限制,如果超过限制也可以使用分割消息的方式进行多次批量发送。
因为批量消息是一个Collection,所以送入消息可以是List,也可以使Set,这里为方便起见,使用List进行批量组装发送。
package org.apache.rocketmq.example.batch;import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;public class SimpleBatchProducer {public static final String PRODUCER_GROUP = "BatchProducerGroupName";public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";public static final String TOPIC = "BatchTest";public static final String TAG = "Tag";public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);// Uncomment the following line while debugging, namesrvAddr should be set to your local addressproducer.setNamesrvAddr(DEFAULT_NAMESRVADDR);producer.start();//If you just send messages of no more than 1MiB at a time, it is easy to use batch//Messages of the same batch should have: same topic, same waitStoreMsgOK and no schedule supportList<Message> messages = new ArrayList<>();messages.add(new Message(TOPIC, TAG, "OrderID001", "Hello world 0".getBytes(StandardCharsets.UTF_8)));messages.add(new Message(TOPIC, TAG, "OrderID002", "Hello world 1".getBytes(StandardCharsets.UTF_8)));messages.add(new Message(TOPIC, TAG, "OrderID003", "Hello world 2".getBytes(StandardCharsets.UTF_8)));SendResult sendResult = producer.send(messages);System.out.printf("%s", sendResult);}
}
批量切分发送
如果消息的总长度可能大于4MB时,这时候最好把消息进行分割,案例中以1M大小进行消息分割。
我们需要发送10万元素的数组,这个量很大,怎么快速发送完。使用批量发送,同时每一批控制在1M左右确保不超过消息大小限制。
package org.apache.rocketmq.example.batch;import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;public class SplitBatchProducer {public static final String PRODUCER_GROUP = "BatchProducerGroupName";public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";public static final int MESSAGE_COUNT = 100 * 1000;public static final String TOPIC = "BatchTest";public static final String TAG = "Tag";public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer(PRODUCER_GROUP);// Uncomment the following line while debugging, namesrvAddr should be set to your local addressproducer.setNamesrvAddr(DEFAULT_NAMESRVADDR);producer.start();//large batchList<Message> messages = new ArrayList<>(MESSAGE_COUNT);for (int i = 0; i < MESSAGE_COUNT; i++) {messages.add(new Message(TOPIC, TAG, "OrderID" + i, ("Hello world " + i).getBytes(StandardCharsets.UTF_8)));}//split the large batch into small ones:ListSplitter splitter = new ListSplitter(messages);while (splitter.hasNext()) {List<Message> listItem = splitter.next();SendResult sendResult = producer.send(listItem);System.out.printf("%s", sendResult);}}}class ListSplitter implements Iterator<List<Message>> {private static final int SIZE_LIMIT = 1000 * 1000;private final List<Message> messages;private int currIndex;public ListSplitter(List<Message> messages) {this.messages = messages;}@Overridepublic boolean hasNext() {return currIndex < messages.size();}@Overridepublic List<Message> next() {int nextIndex = currIndex;int totalSize = 0;for (; nextIndex < messages.size(); nextIndex++) {Message message = messages.get(nextIndex);int tmpSize = message.getTopic().length() + message.getBody().length;Map<String, String> properties = message.getProperties();for (Map.Entry<String, String> entry : properties.entrySet()) {tmpSize += entry.getKey().length() + entry.getValue().length();}//for log overheadtmpSize = tmpSize + 20;if (tmpSize > SIZE_LIMIT) {//it is unexpected that single message exceeds the sizeLimit//here just let it go, otherwise it will block the splitting processif (nextIndex - currIndex == 0) {//if the next sublist has no element, add this one and then break, otherwise just breaknextIndex++;}break;}if (tmpSize + totalSize > SIZE_LIMIT) {break;} else {totalSize += tmpSize;}}List<Message> subList = messages.subList(currIndex, nextIndex);currIndex = nextIndex;return subList;}@Overridepublic void remove() {throw new UnsupportedOperationException("Not allowed to remove");}
}
4、消息的过滤
在实际的开发应用中,对于一类消息尽可能使用一个Topic进行存储,但在消费时需要选择您想要的消息,这时可以使用RocketMQ的消息过滤功能,具体实现是利用消息的Tag和Key。
Key一般用于消息在业务层面的唯一标识。对发送的消息设置好 Key,以后可以根据这个 Key 来查找消息。比如消息异常,消息丢失,进行查找会很方便。RocketMQ 会创建专门的索引文件,用来存储 Key与消息的映射,由于底层实现是 Hash 索引,应尽量使 Key唯一,避免潜在的哈希冲突。
Tag可以理解为是二级分类。以淘宝交易平台为例,订单消息和支付消息属于不同业务类型的消息,分别创建OrderTopic 和PayTopic,其中订单消息根据不同的商品品类以不同的 Tag 再进行细分,如手机类、家电类、男装类、女装类、化妆品类,最后它们都被各个不同的系统所接收。通过合理的使用 Topic 和 Tag,可以让业务结构清晰,更可以提高效率。
Key和Tag的主要差别是使用场景不同,Key主要用于通过命令行命令查询消息,而Tag用于在消息端的代码中,用来进行服务端消息过滤。
Tag过滤
使用Tag过滤的方式是在消息生产时传入感兴趣的Tag标签,然后在消费端就可以根据Tag来选择您想要的消息。具体的操作是在创建Message的时候添加,一个Message只能有一个Tag。
package org.apache.rocketmq.example.filter;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;public class TagFilterProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");producer.setNamesrvAddr("127.0.0.1:9876");producer.start();String[] tags = new String[] {"TagA", "TagB", "TagC"};for (int i = 0; i < 60; i++) {Message msg = new Message("TagFilterTest",tags[i % tags.length],"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);}producer.shutdown();}
}
消费者消费时,只选择TagA和TagC的消息。
package org.apache.rocketmq.example.filter;import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;public class TagFilterConsumer {public static void main(String[] args) throws MQClientException {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");consumer.setNamesrvAddr("127.0.0.1:9876");consumer.subscribe("TagFilterTest", "TagA || TagC");consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf("Consumer Started.%n");}
}
注意事项
Tag过滤的形式非常简单,||代表或、*代表所有,所以使用Tag过滤这对于复杂的场景可能不起作用。在这种情况下,可以使用SQL表达式筛选消息。
Sql过滤
SQL特性可以通过发送消息时的属性来进行消息的过滤计算。具体的操作是使用SQL92标准的sql语句,前提是只有使用push模式的消费者才能用(消费的模式就是push)
SQL基本语法
数值比较:比如:>,>=,<,<=,BETWEEN,=;
字符比较:比如:=,<>,IN;
IS NULL 或者 IS NOT NULL;
逻辑符号:AND,OR,NOT;
常量支持类型为:
数值,比如:123,3.1415;
字符,比如:'abc',必须用单引号包裹起来;
NULL,特殊的常量
布尔值,TRUE 或 FALSE
注意事项
Sql过滤需要Broker开启这项功能(如果消费时使用SQL过滤抛出异常错误,说明Sql92功能没有开启),需要修改Broker.conf配置文件。加入enablePropertyFilter=true 然后重启Broker服务。
package org.apache.rocketmq.example.filter;import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;public class SqlFilterProducer {public static void main(String[] args) throws Exception {DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");producer.setNamesrvAddr("127.0.0.1:9876");producer.start();String[] tags = new String[] {"TagA", "TagB", "TagC"};for (int i = 0; i < 10; i++) {Message msg = new Message("SqlFilterTest",tags[i % tags.length],("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));//设置sql过滤的属性msg.putUserProperty("a", String.valueOf(i));SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);}producer.shutdown();}
}
用MessageSelector.bySql来使用sql筛选消息
package org.apache.rocketmq.example.filter;import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;public class SqlFilterConsumer {public static void main(String[] args) throws Exception {DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");consumer.setNamesrvAddr("127.0.0.1:9876");// Don't forget to set enablePropertyFilter=true in brokerconsumer.subscribe("SqlFilterTest",MessageSelector.bySql("(TAGS is not null and TAGS in ('TagA', 'TagB'))" +"and (a is not null and a between 0 and 3)"));consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.start();System.out.printf("Consumer Started.%n");}
}
5、分布式事务消息
业务场景:用户A转账100元给用户B,这个业务比较简单,具体的步骤:
1、用户A的账户先扣除100元
2、再把用户B的账户加100元
RocketMq把消息分为两个阶段:半事务阶段和确认阶段
半事务阶段:
该阶段主要发一个消息到rocketmq,但该消息只储存在commitlog中,但consumeQueue中不可见,也就是消费端(订阅端)无法看到此消息
确认阶段(commit/rollback):
该阶段主要是把半事务消息保存到consumeQueue中,即让消费端可以看到此消息,也就是可以消费此消息。如果是rollback就不保存。
整个流程:
1、A在扣款之前,先发送半事务消息
2、发送预备消息成功后,执行本地扣款事务
3、扣款成功后,再发送确认消息
4、B消息端(加钱业务)可以看到确认消息,消费此消息,进行加钱
注意:上面的确认消息可以为commit消息,可以被订阅者消费;也可以是Rollback消息,即执行本地扣款事务失败后,提交rollback消息,即删除那个半事务消息,订阅者无法消费。这样就可以解决以下异常问题:
异常1:如果发送半事务消息失败,下面的流程不会走下去,这个是正常的。
异常2:如果发送半事务消息成功,但执行本地事务失败。这个也没有问题,因为此半事务消息不会被消费端订阅到,消费端不会执行业务。
异常3:如果发送半事务消息成功,执行本地事务成功,但发送确认消息失败;这个就有问题了,因为用户A扣款成功了,但加钱业务没有订阅到确认消息,无法加钱。这里出现了数据不一致。
RocketMq如何解决上面的问题,核心思路就是【事务回查】,也就是RocketMq会定时遍历commitlog中的半事务消息。
对于异常3,发送半事务消息成功,本地扣款事务成功,但发送确认消息失败;因为RocketMq会进行回查半事务消息,在回查后发现业务已经扣款成功了,就补发“发送commit确认消息”;这样加钱业务就可以订阅此消息了。
这个思路其实把异常2也解决了,如果本地事务没有执行成功,RocketMQ回查业务,发现没有执行成功,就会发送RollBack确认消息,把消息进行删除。
同时还要注意的点是,RocketMQ不能保障消息的重复,所以在消费端一定要做幂等性处理。
除此之外,如果消费端发生消费失败,同时也需要做重试,如果重试多次,消息会进入死信队列,这个时候也需要进行特殊的处理。(一般就是把A已经处理完的业务进行回退)
package org.apache.rocketmq.example.transaction;import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;import java.io.UnsupportedEncodingException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;public class TransactionProducer {public static final String PRODUCER_GROUP = "please_rename_unique_group_name";public static final String DEFAULT_NAMESRVADDR = "127.0.0.1:9876";public static final String TOPIC = "TopicTest1234";public static final int MESSAGE_COUNT = 10;public static void main(String[] args) throws MQClientException, InterruptedException {TransactionListener transactionListener = new TransactionListenerImpl();//创建事务监听器TransactionMQProducer producer = new TransactionMQProducer(PRODUCER_GROUP);producer.setNamesrvAddr(DEFAULT_NAMESRVADDR);//创建线程池ExecutorService executorService = new ThreadPoolExecutor(2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000), r -> {Thread thread = new Thread(r);thread.setName("client-transaction-msg-check-thread");return thread;});producer.setExecutorService(executorService);//设置生产者回查线程池producer.setTransactionListener(transactionListener);//生产者设置事务回查监听器producer.start();//启动消息生产者String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};for (int i = 0; i < MESSAGE_COUNT; i++) {try {Message msg =new Message(TOPIC, tags[i % tags.length], "KEY" + i,("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));SendResult sendResult = producer.sendMessageInTransaction(msg, null);System.out.printf("%s%n", sendResult.getSendStatus());//半事务消息是否成功Thread.sleep(10);} catch (MQClientException | UnsupportedEncodingException e) {e.printStackTrace();}}for (int i = 0; i < 100000; i++) {//等待Thread.sleep(1000);}producer.shutdown();}
}
事务回查案例代码
package org.apache.rocketmq.example.transaction;import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;public class TransactionListenerImpl implements TransactionListener {private AtomicInteger transactionIndex = new AtomicInteger(0);private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();//执行本地事务@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {int value = transactionIndex.getAndIncrement();int status = value % 3;localTrans.put(msg.getTransactionId(), status);return LocalTransactionState.UNKNOW;}//事务回查,默认60秒检查一次@Overridepublic LocalTransactionState checkLocalTransaction(MessageExt msg) {Integer status = localTrans.get(msg.getTransactionId());if (null != status) {switch (status) {case 0:return LocalTransactionState.UNKNOW;case 1:return LocalTransactionState.COMMIT_MESSAGE;case 2:return LocalTransactionState.ROLLBACK_MESSAGE;default:return LocalTransactionState.COMMIT_MESSAGE;}}return LocalTransactionState.COMMIT_MESSAGE;}
}
1、事务消息不支持延时消息和批量消息。
2、事务回查的间隔时间:BrokerConfig. transactionCheckInterval 通过Broker的配置文件设置好。
3、为了避免单个消息被检查太多次而导致半队列消息累积,我们默认将单个消息的检查次数限制为 15 次,但是用户可以通过 Broker 配置文件的 transactionCheckMax参数来修改此限制。如果已经检查某条消息超过 N 次的话( N = transactionCheckMax ) 则 Broker 将丢弃此消息,并在默认情况下同时打印错误日志。用户可以通过重写 AbstractTransactionCheckListener 类来修改这个行为。、
4、事务消息将在 Broker 配置文件中的参数 transactionMsgTimeout 这样的特定时间长度之后被检查。当发送事务消息时,用户还可以通过设置用户属性 CHECK_IMMUNITY_TIME_IN_SECONDS 来改变这个限制,该参数优先于 transactionMsgTimeout 参数。
5、事务性消息可能不止一次被检查或消费。
6、事务性消息中用到了生产者群组,这种就是一种高可用机制,用来确保事务消息的可靠性。
7、提交给用户的目标主题消息可能会失败,目前这依日志的记录而定。它的高可用性通过 RocketMQ 本身的高可用性机制来保证,如果希望确保事务消息不丢失、并且事务完整性得到保证,建议使用同步的双重写入机制。
8、事务消息的生产者 ID 不能与其他类型消息的生产者 ID 共享。与其他类型的消息不同,事务消息允许反向查询、MQ服务器能通过它们的生产者 ID 查询到消费者。
6、Request-Reply消息
什么是Request-Reply?
RocketMQ 中"Request-Reply"模式允许Producer发出消息后,以同步或异步的形式等Consumer消费并返回一个响应消息,达到类似RPC的调用过程。
RocketMQ从4.6.0版本开始支持这种模式。这种模式的流程如下图:
与RPC的不同
RocketMQ的这种调用方式跟dubbo之类的RPC调用非常类似,那为什么不使用dubbo?而要使用RocketMQ的这种RPC调用呢?原因如下:
基于RocketMQ来实现RPC可以快速搭建服务的消息总线,实现自己的RPC框架。
基于RocketMQ来实现RPC可以方便的收集调用的相关信息,能够实现调用链路追踪和分析。
基于RocketMQ来实现RPC既可以解耦两个系统之间的依赖,也可以实现跨网络区域实现系统间的同步调用,这里RocketMQ扮演的是一个类似于网关的角色。
Request-Reply的实现逻辑
在以上图中,可以看到,使用RPC模式还是三方:Producer、Broker、Consumer。
在Producer中进行消息的发送时,可以随便指定Topic,但是需要送入reply_to_client、correlation_id两个关键信息,reply_to_client记录着请求方的clientlD(用于Broker响应时确定client端)。而correlation_id是标识每次请求的,用于响应消息与请求的配对。而在进行发送消息时,也有两种模式,一种同步阻塞,另外一种异步非阻塞,这些跟之前普通消息的三种发送方式类似。
Broker端除了Producer发送时指定的Topic之外,还有一个Reply_Topic,这个以集群名_REPLY_TOPIC命名(不管RPC生产者主题有多少,这个在一个集群中只有一个),主要用于Consumer响应RPC消息的路由发现。
Consumer端除了消费监听之外,还需要加入一个消息的生产(用于RPC的响应消息),必须使用客户端提供的MessageUtil进行消息的包装,防止关键信息丢失从而导致Producer不能收到RPC消息响应。
package org.apache.rocketmq.example.rpc;import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;public class RequestProducer {public static void main(String[] args) throws MQClientException, InterruptedException {String producerGroup = "please_rename_unique_group_name";String topic = "RequestTopic";long ttl = 3000;//超时时间DefaultMQProducer producer = new DefaultMQProducer(producerGroup);producer.setNamesrvAddr("127.0.0.1:9876");producer.start();try {Message msg = new Message(topic,"","Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));long begin = System.currentTimeMillis();Message retMsg = producer.request(msg, ttl);long cost = System.currentTimeMillis() - begin;System.out.printf("request to <%s> cost: %d replyMessage: %s %n", topic, cost, retMsg);} catch (Exception e) {e.printStackTrace();}producer.shutdown();}
}
package org.apache.rocketmq.example.rpc;import java.nio.charset.StandardCharsets;
import java.util.List;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.client.utils.MessageUtil;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.remoting.exception.RemotingException;public class ResponseConsumer {public static void main(String[] args) throws InterruptedException, MQClientException {String producerGroup = "please_rename_unique_group_name";String consumerGroup = "please_rename_unique_group_name";String topic = "RequestTopic";// create a producer to send reply messageDefaultMQProducer replyProducer = new DefaultMQProducer(producerGroup);replyProducer.setNamesrvAddr("127.0.0.1:9876");replyProducer.start();// create consumerDefaultMQPushConsumer consumer = new DefaultMQPushConsumer(consumerGroup);consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);consumer.setNamesrvAddr("127.0.0.1:9876");// recommend client configsconsumer.setPullTimeDelayMillsWhenException(0L);consumer.registerMessageListener(new MessageListenerConcurrently() {@Overridepublic ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);for (MessageExt msg : msgs) {try {System.out.printf("handle message: %s %n", msg.toString());String replyTo = MessageUtil.getReplyToClient(msg);byte[] replyContent = "reply message contents.".getBytes(StandardCharsets.UTF_8);// create reply message with given util, do not create reply message by yourselfMessage replyMessage = MessageUtil.createReplyMessage(msg, replyContent);// send reply message with producerSendResult replyResult = replyProducer.send(replyMessage, 3000);System.out.printf("reply to %s , %s %n", replyTo, replyResult.toString());} catch (MQClientException | RemotingException | MQBrokerException | InterruptedException e) {e.printStackTrace();}}return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}});consumer.subscribe(topic, "*");consumer.start();System.out.printf("Consumer Started.%n");}
}
7、死信队列
当一条消息初次消费失败,消息队列 RocketMQ 会自动进行消息重试;达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息,此时,消息队列 RocketMQ 不会立刻将消息丢弃,而是将其发送到该消费者对应的特殊队列中。
在消息队列 RocketMQ 中,这种正常情况下无法被消费的消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue)。
死信特性
死信消息具有以下特性:
不会再被消费者正常消费。
有效期与正常消息相同,均为 3 天,3 天后会被自动删除。因此,请在死信消息产生后的 3 天内及时处理。
死信队列具有以下特性:
不会再被消费者正常消费。
一个死信队列对应一个 Group ID, 而不是对应单个消费者实例。
如果一个 Group ID 未产生死信消息,消息队列 RocketMQ 不会为其创建相应的死信队列。
一个死信队列包含了对应 Group ID 产生的所有死信消息,不论该消息属于哪个 Topic。
RocketMQ Learning(一)
不是你觉的悟到的东西给了你,你也接不住!
干我们这行,啥时候懈怠,就意味着长进的停止,长进的停止就意味着被淘汰,只能往前冲,直到凤凰涅槃的一天!