JAVA开发( 腾讯云消息队列 RocketMQ使用总结 )

一、问题背景

      之所以需要不停的总结是因为在java开发过程中使用到中间件实在太多了,久久不用就会慢慢变得生疏,有时候一个中间很久没使用,可能经过了很多版本的迭代,使用起来又有区别。所以还是得不断总结更新。最近博主就是在使用腾讯云RocketMQ中遇到了点问题,排查了很久,也不知道什么原因,最好咨询了了腾讯官方技术支撑,最终解决。现在很多中间件都是各位巨头经过封装,然后卖给中小企业,有时候遇到点问题,还不容易在网上搜索资料排查到,都只能在巨头的生态里摸索,排查,请教。

二、RocketMQ产品概述

消息队列 RocketMQ 版(TDMQ for RocketMQ,简称 TDMQ RocketMQ 版)是腾讯云基于 Apache RocketMQ 构建的分布式消息中间件,完全兼容 Apache RocketMQ 的各个组件与概念,支持开源社区版本的客户端零改造接入。

消息队列 RocketMQ 版具有低延迟、高性能、高可靠、万亿级消息容量和灵活可扩展等特点,可为分布式应用系统提供异步解耦和削峰填谷的能力,同时也具备互联网应用所需的海量消息堆积、高吞吐、可靠重试等特性。

三、RocketMQ的组成部分和基本概念

Producer 集群: 客户侧应用,负责生产并发送消息。

Consumer 集群:客户侧应用,负责订阅和消费处理消息。

Nameserver 集群: 服务端应用,负责路由寻址和 Broker 心跳注册。

心跳注册:NameServer 相当于注册中心的角色,各个角色的机器都要定时向 NameServer 上报自己的状态,如果超时未上报,NameServer 会认为某个机器出现故障不可用了,从而将这个机器从可用列表中删除。

路由寻址:每个 NameServer 中都保存着 Broker 集群的整个路由信息和用于客户端查询的队列信息,生产者和消费者通过 NameServer 去获取整个Broker 集群的路由信息,从而进行消息的投递和消费。

Broker集群:服务端应用,负责接收,存储,投递消息,支持主从多副本模式,从节点可选部署,实际现网公有云上数据高可靠直接依赖云盘三副本。

管控集群: 服务端应用,可视化的管控控制台,负责运维整个集群,例如源数据的收发和管理等。

消息(Message)

消息系统所传输信息的物理载体,生产和消费数据的最小单位。生产者将业务数据的负载和拓展属性包装成消息发送到服务端,服务端按照相关语义将消息投递到消费端进行消费。

主题(Topic)

Topic 表示一类消息的集合,每个主题包含若干消息,是 RocketMQ 进行消息订阅的基本单位。

消息标签(MessageTag)

为消息设置的标签,用于将同一个 Topic 下区分不同类型的消息,可以理解为 Topic 是消息的一级分类,Tag 是消息的二级分类。

消息队列(MessageQueue)

存储消息的物理实体,一个 Topic 可以包含多个 Queue,Queue 也叫消息分区,一个 Queue 中的消息只能被一个消费者组中的一个消费者消费,一个 Queue 中的消息不允许同一个消费者组中的多个消费者同时消费。

消息位点(MessageQueueOffset)​

消息是按到达 RocketMQ 服务端的先后顺序存储在指定主题的多个队列中,每条消息在队列中都有一个唯一的 Long 类型坐标,这个坐标被定义为消息位点。

消费位点(ConsumerOffset)​

一条消息被某个消费者消费完成后不会立即从队列中删除, RocketMQ 会基于每个消费者分组记录消费过的最新一条消息的位点,即消费位点。

消息索引(MessageKey)​

消息索引是 RocketMQ 提供的面向消息的索引属性。通过设置的消息索引可以快速查找到对应的消息内容。

生产者(Producer)​

生产者是 RocketMQ 系统中用来构建并传输消息到服务端的运行实体。生产者通常被集成在业务系统中,将业务消息按照要求封装成消息并发送至服务端。

消费者(Consumer)​

消费者是 RocketMQ 中用来接收并处理消息的运行实体。消费者通常被集成在业务系统中,从服务端获取消息,并将消息转化成业务可理解的信息,供业务逻辑处理。

分组(Group)

可分为生产者组和消费者组:

生产者组:同一类 Producer 的集合,这类 Producer 发送同一类消息且发送逻辑一致。如果发送的是事务消息,且生产者发送后崩溃,则 Broker 服务器会联系同一个生产者组的其他生产者实例以提交或者回溯消费。

消费者组:同一类 Consumer 的集合,这类 Consumer 通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面实现了负载均衡和容错。消费者组的消费者实例必须订阅完全相同的 Topic。

消息类型(MessageType)​

按照消息传输特性的不同而定义的分类,用于类型管理和安全校验。RocketMQ 支持的消息类型有普通消息、顺序消息、事务消息和定时/延时消息。

普通消息

普通消息是一种基础的消息类型,由生产投递到指定 Topic 后,被订阅了该 Topic 的消费者所消费。普通消息的 Topic 中无顺序的概念,可以使用多个分区数来提升消息的生产和消费效率,在吞吐量巨大时其性能最好。

顺序消息

顺序消息是消息队列 RocketMQ 提供的一种高级消息类型,对于一个指定的Topic,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发送的消息先消费,后发送的消息后消费。

重试队列

重试队列是一种为了确保消息被正常消费而设计的队列。当某些消息第一次被消费者消费后,没有得到正常的回应,则会进入重试队列,当重试达到一定次数后,停止重试,投递到死信队列中。

由于实际场景中,可能会存在的一些临时短暂的问题(如网络抖动,服务重启等)导致消息无法及时被处理,但短暂时间过后又恢复正常。这种场景下,重试队列的重试机制就可以很好解决此类问题。

死信队列

死信队列是一种特殊的消息队列,用于集中处理无法被正常消费的消息的队列。当消息在重试队列中达到一定重试次数后仍未能被正常消费,TDMQ 会判定这条消息在当前情况下无法被消费,将其投递至死信队列。

实际场景中,消息可能会由于持续一段时间的服务宕机,网络断连而无法被消费。这种场景下,消息不会被立刻丢弃,死信队列会对这种消息进行较为长期的持久化,用户可以在找到对应解决方案后,创建消费者订阅死信队列来完成对当时无法处理消息的处理。

集群消费

集群消费:当使用集群消费模式时,任意一条消息只需要被集群内的任意一个消费者处理即可。适用于每条消息只需要被处理一次的场景。

广播消费

广播消费:当使用广播消费模式时,每条消息会被推送给集群内所有注册过的消费者,保证消息至少被每个消费者消费一次。适用于每条消息需要被集群下每一个消费者处理的场景。

消息过滤​

消费者可以通过订阅指定消息标签(Tag)对消息进行过滤,确保最终只接收被过滤后的消息合集。过滤规则的计算和匹配在 RocketMQ 的服务端完成。

重置消费位点​

以时间轴为坐标,在消息持久化存储的时间范围内,重新设置消费者分组对已订阅主题的消费进度,设置完成后消费者将接收设定时间点之后,由生产者发送到 RocketMQ 服务端的消息。

消息轨迹​

在一条消息从生产者发出到消费者接收并处理过程中,由各个相关节点的时间、地点等数据汇聚而成的完整链路信息。通过消息轨迹,您能清晰定位消息从生产者发出,经由 RocketMQ 服务端,投递给消费者的完整链路,方便定位排查问题。

消息堆积​

生产者已经将消息发送到 RocketMQ 的服务端,但由于消费者的消费能力有限,未能在短时间内将所有消息正确消费掉,此时在服务端保存着未被消费的消息,该状态即消息堆积。

四、应用场景

异步解耦

交易引擎作为腾讯计费最核心的系统,每笔交易订单数据需要被几十个下游业务系统关注,包括库存系统、仓储系统、促销系统、积分系统等,多个系统对消息的处理逻辑不一致,单个系统不可能去适配每一个关联业务。此时,TDMQ RocketMQ 版可解除多个业务系统之间的耦合度,减少系统之间影响,提升核心业务响应速度和健壮性。

削峰填谷

企业不定时举办的一些营销活动,新品发布上线,节日抢红包等,往往都会带来临时性的流量洪峰,这对后端的各个应用系统考验是十分巨大的,如果直接采用扩容方式应对又会带来一定的资源浪费。RocketMQ 可以应对突发性的流量洪峰,在峰值时堆积消息,而在峰值过去后下游系统慢慢消费消息,解决上下游处理能力不匹配,提升系统可用性。

 还有等等

顺序收发

顺序消息是消息队列 RocketMQ 提供的一种高级消息类型,对于一个指定的Topic,消息严格按照先进先出(FIFO)的原则进行消息发布和消费,即先发送的消息先消费,后发送的消息后消费。顺序消息常用于以下业务场景:

订单创建场景:在一些电商系统中,同一个订单相关的创建订单消息、订单支付消息、订单退款消息、订单物流消息必须严格按照先后顺序来进行生产或者消费,否则消费中传递订单状态会发生紊乱,影响业务的正常进行。因此,该订单的消息必须按照一定的顺序在客户端和消息队列中进行生产和消费,同时消息之间有先后的依赖关系,后一条消息需要依赖于前一条消息的处理结果。

日志同步场景:在有序事件处理或者数据实时增量同步的场景中,顺序消息也能发挥较大的作用,如同步 mysql 的 binlog 日志时,需要保证数据库的操作是有顺序的。

金融场景:在一些撮合交易的场景下,比如某些证券交易,在价格相同的情况下,先出价者优先处理,则需要按照FIFO的方式生产和消费顺序消息。

五、如何使用

 集成到springBoot

mq配置信息项

server:port: 8082#rocketmq配置信息rocketmq:# tdmq-rocketmq服务接入地址name-server: rocketmq-xxx.rocketmq.ap-bj.public.tencenttdmq.com:9876# 生产者配置producer:# 生产者组名group: group111# 角色密钥access-key: eyJrZXlJZC....# 已授权的角色名称secret-key: admin# 消费者公共配置consumer:# 角色密钥access-key: eyJrZXlJZC....# 已授权的角色名称secret-key: admin# 用户自定义配置namespace: MQ_INST_rocketmqxxxxxxxxproducer1:topic: testdev1consumer1:group: group111topic: testdev1subExpression: TAG1consumer2:group: group222topic: testdev1subExpression: TAG2

<!-- in your <dependencies> block -->
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-client</artifactId><version>4.9.3</version>
</dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-acl</artifactId><version>4.9.3</version>
</dependency>

创建生产者 

// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer(namespace, groupName,new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)) // ACL权限
);
// 设置NameServer的地址
producer.setNamesrvAddr(nameserver);
// 启动Producer实例
producer.start();

发送消息

for (int i = 0; i < 10; i++) {// 创建消息实例,设置topic和消息内容Message msg = new Message(topic_name, "TAG", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));// 发送消息SendResult sendResult = producer.send(msg);System.out.printf("%s%n", sendResult);
}

异步发送

// 设置发送失败后不重试
producer.setRetryTimesWhenSendAsyncFailed(0);
// 设置发送消息的数量
int messageCount = 10;
final CountDownLatch countDownLatch = new CountDownLatch(messageCount);
for (int i = 0; i < messageCount; i++) {try {final int index = i;// 创建消息实体,设置topic和消息内容Message msg = new Message(topic_name, "TAG", ("Hello rocketMq " + index).getBytes(RemotingHelper.DEFAULT_CHARSET));producer.send(msg, new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {// 消息发送成功逻辑countDownLatch.countDown();System.out.printf("%-10d OK %s %n", index, sendResult.getMsgId());}@Overridepublic void onException(Throwable e) {// 消息发送失败逻辑countDownLatch.countDown();System.out.printf("%-10d Exception %s %n", index, e);e.printStackTrace();}});} catch (Exception e) {e.printStackTrace();}
}
countDownLatch.await(5, TimeUnit.SECONDS);

创建消费者

// 实例化消费者
DefaultMQPushConsumer pushConsumer = new DefaultMQPushConsumer(namespace,                                                  groupName,                                              new AclClientRPCHook(new SessionCredentials(accessKey, secretKey))); //ACL权限
// 设置NameServer的地址
pushConsumer.setNamesrvAddr(nameserver);

消费信息

// 实例化消费者
DefaultLitePullConsumer pullConsumer = new DefaultLitePullConsumer(namespace,                                               groupName,                                             new AclClientRPCHook(new SessionCredentials(accessKey, secretKey)));
// 设置NameServer的地址
pullConsumer.setNamesrvAddr(nameserver);
// 设置从第一个偏移量开始消费
pullConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);

发布与订阅模式

发布到订阅

// 订阅topic
pushConsumer.subscribe(topic_name, "*");
// 注册回调实现类来处理从broker拉取回来的消息
pushConsumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {// 消息处理逻辑System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);// 标记该消息已经被成功消费, 根据消费情况,返回处理状态return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
// 启动消费者实例
pushConsumer.start();

订阅信息

// 订阅topic
pullConsumer.subscribe(topic_name, "*");
// 启动消费者实例
pullConsumer.start();
try {System.out.printf("Consumer Started.%n");while (true) {// 拉取消息List<MessageExt> messageExts = pullConsumer.poll();System.out.printf("%s%n", messageExts);}
} finally {pullConsumer.shutdown();
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/13597.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于matlab使用车载激光雷达数据在惯性测量单元读数帮助下构建地图(附源码)

一、前言 此示例演示如何处理来自安装在车辆上的传感器的 3-D 激光雷达数据&#xff0c;以便在惯性测量单元 &#xff08;IMU&#xff09; 读数的帮助下逐步构建地图。这样的地图可以促进车辆导航的路径规划&#xff0c;也可以用于定位。为了评估生成的地图&#xff0c;此示例…

Lingo优化软件初步

一、Lingo软件介绍 1、lingo软件的简单介绍 美国芝加哥大学的Linus Schrage教授于1980年左右开发的专门用于求解最优化问题的软件包&#xff0c;后经多年完善与扩充&#xff0c;并成立了LINDO系统公司进行商业运作取得巨大成功。根据 LINDO公司主页&#xff08;http://www.li…

FPGA入门系列12--RAM的使用1

文章简介 本系列文章主要针对FPGA初学者编写&#xff0c;包括FPGA的模块书写、基础语法、状态机、RAM、UART、SPI、VGA、以及功能验证等。将每一个知识点作为一个章节进行讲解&#xff0c;旨在更快速的提升初学者在FPGA开发方面的能力&#xff0c;每一个章节中都有针对性的代码…

Spring Boot 中的认证是什么,如何使用

Spring Boot 中的认证是什么&#xff0c;如何使用 在 Web 应用程序中&#xff0c;认证是一项重要的安全措施。Spring Boot 提供了丰富的认证机制&#xff0c;可以帮助我们轻松地实现各种认证需求。本文将介绍 Spring Boot 中的认证是什么&#xff0c;以及如何使用 Spring Boot…

多元回归预测 | Matlab阿基米德算法(AOA)优化极限梯度提升树XGBoost回归预测,AOA-XGBoost回归预测模型,多变输入模型

文章目录 效果一览文章概述部分源码参考资料效果一览 文章概述 阿基米德算法(AOA)优化极限梯度提升树XGBoost回归预测,AOA-XGBoost回归预测模型,多变输入模型,多变量输入模型,多变量输入模型,matlab代码回归预测,多变量输入模型,多变量输入模型 评价指标包括:MAE、RMSE和R2…

oracle新建库(表空间)表

文章目录 前言一、sqlplus登录二、表空间1.新建表空间2. 查看表空间3. 查看表空间和对应数据文件4.表空间增加数据文件5.删除单个数据文件&#xff08;只有一个默认的会删除失败&#xff09;6.删除表空间及数据文件(慎用) 三、创建新用户并指定表空间1.去掉前缀2.新建用户&…

npm 记录

转 请看原文&#xff0c;我只是怕原文没了&#xff0c;复制了一遍。我目的是想记录缓存那一块。 前端工程化 - 剖析npm的包管理机制 - 掘金 在content-v2/sha512 执行 grep -n "https://registry.npmjs.org/base64-js/-/base64-js-1.0.1.tgz" -r ./ 获取缓存包…

整数序列(山东大学考研机试题)

水仙花数(中南考研机试题) 链接:3644. 水仙花数 - AcWing题库 /* 暴力枚举罢了 */ #include<iostream> using namespace std; const int N1e3100; int book[N]; int pow3(int k){return k*k*k; } int main() {int m,n;for(int i100;i<999;i){int t1,t2,t3;t1 i%10;t…

Redis从入门到精通【进阶篇】之消息传递发布订阅模式详解

文章目录 0. 前言1. 基本原理1.1 基于频道(Channel)的发布/订阅1.2 基于模式(Pattern)的发布/订阅 2. Redis 发布订阅实际应用2.1 Redis Sentinel2.1 SpringBoot Redis发布/订阅 3. Redis从入门到精通系列文章 0. 前言 发布订阅模式&#xff08;Publish-Subscribe Pattern&…

前端Vue自定义轮播图视频播放组件 仿京东商品详情轮播图视频Video播放效果 可图片预览

前端Vue自定义轮播图视频播放组件 仿京东商品详情轮播图视频Video播放 &#xff0c;可图片预览&#xff0c;下载完整代码请访问uni-app插件市场地址&#xff1a;https://ext.dcloud.net.cn/plugin?id13325 效果图如下: # cc-videoSwiper #### 使用方法 使用方法 <!-- g…

SSM学习笔记-------Spring(一)

SSM学习笔记-------Spring&#xff08;一&#xff09; Spring_day011、课程介绍1.1 为什么要学?1.2 学什么?1.3 怎么学? 2、Spring相关概念2.1 初识Spring2.1.1 Spring家族2.1.2 了解Spring发展史 2.2 Spring系统架构2.2.1 系统架构图2.2.2 课程学习路线 2.3 Spring核心概念…

Mac如何在终端使用diskutil命令装载和卸载推出外接硬盘

最近用 macOS 装载外接硬盘的时候&#xff0c;使用mount死活装不上&#xff0c;很多文章也没详细的讲各种情况&#xff0c;所以就写一篇博客来记录一下。 如何装载和卸载硬盘&#xff08;或者说分区&#xff09; mount和umount是在 macOS 上是不能用的&#xff0c;如果使用会…