一、问题背景
之所以需要不停的总结是因为在java开发过程中使用到中间件实在太多了,久久不用就会慢慢变得生疏,有时候一个中间很久没使用,可能经过了很多版本的迭代,使用起来又有区别。所以还是得不断总结更新。最近博主就是在使用腾讯云RocketMQ中遇到了点问题,排查了很久,也不知道什么原因,最好咨询了了腾讯官方技术支撑,最终解决。现在很多中间件都是各位巨头经过封装,然后卖给中小企业,有时候遇到点问题,还不容易在网上搜索资料排查到,都只能在巨头的生态里摸索,排查,请教。
二、RocketMQ产品概述
消息队列 RocketMQ 版(TDMQ for RocketMQ,简称 TDMQ RocketMQ 版)是腾讯云基于 Apache RocketMQ 构建的分布式消息中间件,完全兼容 Apache RocketMQ 的各个组件与概念,支持开源社区版本的客户端零改造接入。
消息队列 RocketMQ 版具有低延迟、高性能、高可靠、万亿级消息容量和灵活可扩展等特点,可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。
三、RocketMQ的组成部分和基本概念
Producer 集群: 客户侧应用,负责生产并发送消息。
Consumer 集群:客户侧应用,负责订阅和消费处理消息。
Nameserver 集群: 服务端应用,负责路由寻址和 Broker 心跳注册。
心跳注册:NameServer 相当于注册中心的角色,各个角色的机器都要定时向 NameServer 上报自己的状态,如果超时未上报,NameServer 会认为某个机器出现故障不可用了,从而将这个机器从可用列表中删除。
路由寻址:每个 NameServer 中都保存着 Broker 集群的整个路由信息和用于客户端查询的队列信息,生产者和消费者通过 NameServer 去获取整个Broker 集群的路由信息,从而进行消息的投递和消费。
Broker集群:服务端应用,负责接收,存储,投递消息,支持主从多副本模式,从节点可选部署,实际现网公有云上数据高可靠直接依赖云盘三副本。
管控集群: 服务端应用,可视化的管控控制台,负责运维整个集群,例如源数据的收发和管理等。
消息(Message)
消息系统所传输信息的物理载体,生产和消费数据的最小单位。生产者将业务数据的负载和拓展属性包装成消息发送到服务端,服务端按照相关语义将消息投递到消费端进行消费。
主题(Topic)
Topic 表示一类消息的集合,每个主题包含若干消息,是 RocketMQ 进行消息订阅的基本单位。
消息标签(MessageTag)
为消息设置的标签,用于将同一个 Topic 下区分不同类型的消息,可以理解为 Topic 是消息的一级分类,Tag 是消息的二级分类。
消息队列(MessageQueue)
存储消息的物理实体,一个 Topic 可以包含多个 Queue,Queue 也叫消息分区,一个 Queue 中的消息只能被一个消费者组中的一个消费者消费,一个 Queue 中的消息不允许同一个消费者组中的多个消费者同时消费。
消息位点(MessageQueueOffset)
消息是按到达 RocketMQ 服务端的先后顺序存储在指定主题的多个队列中,每条消息在队列中都有一个唯一的 Long 类型坐标,这个坐标被定义为消息位点。
消费位点(ConsumerOffset)
一条消息被某个消费者消费完成后不会立即从队列中删除, RocketMQ 会基于每个消费者分组记录消费过的最新一条消息的位点,即消费位点。
消息索引(MessageKey)
消息索引是 RocketMQ 提供的面向消息的索引属性。通过设置的消息索引可以快速查找到对应的消息内容。
生产者(Producer)
生产者是 RocketMQ 系统中用来构建并传输消息到服务端的运行实体。生产者通常被集成在业务系统中,将业务消息按照要求封装成消息并发送至服务端。
消费者(Consumer)
消费者是 RocketMQ 中用来接收并处理消息的运行实体。消费者通常被集成在业务系统中,从服务端获取消息,并将消息转化成业务可理解的信息,供业务逻辑处理。
分组(Group)
可分为生产者组和消费者组:
生产者组:同一类 Producer 的集合,这类 Producer 发送同一类消息且发送逻辑一致。如果发送的是事务消息,且生产者发送后崩溃,则 Broker 服务器会联系同一个生产者组的其他生产者实例以提交或者回溯消费。
消费者组:同一类 Consumer 的集合,这类 Consumer 通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面实现了负载均衡和容错。消费者组的消费者实例必须订阅完全相同的 Topic。
消息类型(MessageType)
按照消息传输特性的不同而定义的分类,用于类型管理和安全校验。RocketMQ 支持的消息类型有普通消息、顺序消息、事务消息和定时/延时消息。
普通消息
普通消息是一种基础的消息类型,由生产投递到指定 Topic 后,被订阅了该 Topic 的消费者所消费。普通消息的 Topic 中无顺序的概念,可以使用多个分区数来提升消息的生产和消费效率,在吞吐量巨大时其性能最好。
顺序消息
顺序消息是消息队列 RocketMQ 提供的一种高级消息类型,对于一个指定的Topic,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发送的消息先消费,后发送的消息后消费。
重试队列
重试队列是一种为了确保消息被正常消费而设计的队列。当某些消息第一次被消费者消费后,没有得到正常的回应,则会进入重试队列,当重试达到一定次数后,停止重试,投递到死信队列中。
由于实际场景中,可能会存在的一些临时短暂的问题(如网络抖动,服务重启等)导致消息无法及时被处理,但短暂时间过后又恢复正常。这种场景下,重试队列的重试机制就可以很好解决此类问题。
死信队列
死信队列是一种特殊的消息队列,用于集中处理无法被正常消费的消息的队列。当消息在重试队列中达到一定重试次数后仍未能被正常消费,TDMQ 会判定这条消息在当前情况下无法被消费,将其投递至死信队列。
实际场景中,消息可能会由于持续一段时间的服务宕机,网络断连而无法被消费。这种场景下,消息不会被立刻丢弃,死信队列会对这种消息进行较为长期的持久化,用户可以在找到对应解决方案后,创建消费者订阅死信队列来完成对当时无法处理消息的处理。
集群消费
集群消费:当使用集群消费模式时,任意一条消息只需要被集群内的任意一个消费者处理即可。适用于每条消息只需要被处理一次的场景。
广播消费
广播消费:当使用广播消费模式时,每条消息会被推送给集群内所有注册过的消费者,保证消息至少被每个消费者消费一次。适用于每条消息需要被集群下每一个消费者处理的场景。
消息过滤
消费者可以通过订阅指定消息标签(Tag)对消息进行过滤,确保最终只接收被过滤后的消息合集。过滤规则的计算和匹配在 RocketMQ 的服务端完成。
重置消费位点
以时间轴为坐标,在消息持久化存储的时间范围内,重新设置消费者分组对已订阅主题的消费进度,设置完成后消费者将接收设定时间点之后,由生产者发送到 RocketMQ 服务端的消息。
消息轨迹
在一条消息从生产者发出到消费者接收并处理过程中,由各个相关节点的时间、地点等数据汇聚而成的完整链路信息。通过消息轨迹,您能清晰定位消息从生产者发出,经由 RocketMQ 服务端,投递给消费者的完整链路,方便定位排查问题。
消息堆积
生产者已经将消息发送到 RocketMQ 的服务端,但由于消费者的消费能力有限,未能在短时间内将所有消息正确消费掉,此时在服务端保存着未被消费的消息,该状态即消息堆积。
四、应用场景
异步解耦
交易引擎作为腾讯计费最核心的系统,每笔交易订单数据需要被几十个下游业务系统关注,包括库存系统、仓储系统、促销系统、积分系统等,多个系统对消息的处理逻辑不一致,单个系统不可能去适配每一个关联业务。此时,TDMQ RocketMQ 版可解除多个业务系统之间的耦合度,减少系统之间影响,提升核心业务响应速度和健壮性。
削峰填谷
企业不定时举办的一些营销活动,新品发布上线,节日抢红包等,往往都会带来临时性的流量洪峰,这对后端的各个应用系统考验是十分巨大的,如果直接采用扩容方式应对又会带来一定的资源浪费。RocketMQ 可以应对突发性的流量洪峰,在峰值时堆积消息,而在峰值过去后下游系统慢慢消费消息,解决上下游处理能力不匹配,提升系统可用性。
还有等等
顺序收发
顺序消息是消息队列 RocketMQ 提供的一种高级消息类型,对于一个指定的Topic,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发送的消息先消费,后发送的消息后消费。顺序消息常用于以下业务场景:
订单创建场景:在一些电商系统中,同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息必须严格按照先后顺序来进行生产或者消费,否则消费中传递订单状态会发生紊乱,影响业务的正常进行。因此,该订单的消息必须按照一定的顺序在客户端和消息队列中进行生产和消费,同时消息之间有先后的依赖关系,后一条消息需要依赖于前一条消息的处理结果。
日志同步场景:在有序事件处理或者数据实时增量同步的场景中,顺序消息也能发挥较大的作用,如同步 mysql 的 binlog 日志时,需要保证数据库的操作是有顺序的。
金融场景:在一些撮合交易的场景下,比如某些证券交易,在价格相同的情况下,先出价者优先处理,则需要按照FIFO的方式生产和消费顺序消息。
五、如何使用
集成到springBoot
mq配置信息项
server:port: 8082#rocketmq配置信息rocketmq:# tdmq-rocketmq服务接入地址name-server: rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:9876# 生产者配置producer:# 生产者组名group: group111# 角色密钥access-key: eyJrZXlJZC....# 已授权的角色名称secret-key: admin# 消费者公共配置consumer:# 角色密钥access-key: eyJrZXlJZC....# 已授权的角色名称secret-key: admin# 用户自定义配置namespace: MQ_INST_rocketmqxxxxxxxxproducer1:topic: testdev1consumer1:group: group111topic: testdev1subExpression: TAG1consumer2:group: group222topic: testdev1subExpression: TAG2
<!-- in your <dependencies> block -->
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.3</version>
</dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-acl</artifactId><version>4.9.3</version>
</dependency>
创建生产者
// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer(namespace, groupName,new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)) // ACL权限
);
// 设置NameServer的地址
producer.setNamesrvAddr(nameserver);
// 启动Producer实例
producer.start();
发送消息
for (int i = 0; i < 10; i++) {// 创建消息实例,设置topic和消息内容Message msg = new Message(topic_name, "TAG", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));// 发送消息SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);
}
异步发送
// 设置发送失败后不重试
producer.setRetryTimesWhenSendAsyncFailed(0);
// 设置发送消息的数量
int messageCount = 10;
final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
for (int i = 0; i < messageCount; i++) {try {final int index = i;// 创建消息实体,设置topic和消息内容Message msg = new Message(topic_name, "TAG", ("Hello rocketMq " + index).getBytes(RemotingHelper.DEFAULT_CHARSET));producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {// 消息发送成功逻辑countDownLatch.countDown();System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());}@Overridepublic void onException(Throwable e) {// 消息发送失败逻辑countDownLatch.countDown();System.out.printf("%-10d Exception %s %n", index, e);e.printStackTrace();}});} catch (Exception e) {e.printStackTrace();}
}
countDownLatch.await(5, TimeUnit.SECONDS);
创建消费者
// 实例化消费者
DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer(namespace, groupName, new AclClientRPCHook(new SessionCredentials(accessKey, secretKey))); //ACL权限
// 设置NameServer的地址
pushConsumer.setNamesrvAddr(nameserver);
消费信息
// 实例化消费者
DefaultLitePullConsumer pullConsumer = new DefaultLitePullConsumer(namespace, groupName, new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)));
// 设置NameServer的地址
pullConsumer.setNamesrvAddr(nameserver);
// 设置从第一个偏移量开始消费
pullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
发布与订阅模式
发布到订阅
// 订阅topic
pushConsumer.subscribe(topic_name, "*");
// 注册回调实现类来处理从broker拉取回来的消息
pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {// 消息处理逻辑System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);// 标记该消息已经被成功消费, 根据消费情况,返回处理状态return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 启动消费者实例
pushConsumer.start();
订阅信息
// 订阅topic
pullConsumer.subscribe(topic_name, "*");
// 启动消费者实例
pullConsumer.start();
try {System.out.printf("Consumer Started.%n");while (true) {// 拉取消息List<MessageExt> messageExts = pullConsumer.poll();System.out.printf("%s%n", messageExts);}
} finally {pullConsumer.shutdown();
}