spring boot 集成rocketMq + 基本使用

1. RocketMq基本概念

1. NameServer
每个NameServer结点之间是相互独立,彼此没有任何信息交互
启动NameServer。NameServer启动后监听端口,等待Broker、Producer、Consumer连接,
相当于一个路由控制中心。主要是用来保存topic路由信息,管理Broker
2. Broker
消息存储和中转角色,负责存储和转发消息
在启动时会向NameServer进行注册并且定时发送心跳包。心跳包中包含当前 Broker 信息
以及存储所有 Topic 信息。注册成功后,NameServer 集群中就有 Topic跟Broker 的映射关系。
3. topic : 一个消息的集合的名字
创建 Topic 时需要指定该 Topic 要存储在哪些 Broker 上,也可以在发送消息时自动创建Topic。
4. 生产者
生产者发送消息。启动时先从 NameServer 集群中的其中一台拉取到路由表,缓存到本地,
并从 NameServer 中获取当前发送的 Topic存在于哪些 Broker 上,
轮询从队列列表中选择一个队列(默认轮询)
5. 消费者
消费者跟其中一台NameServer建立连接,获取当前订阅Topic存在哪些Broker上,
然后直接跟Broker建立连接通道,然后开始消费消息

2. maven 引入starter

<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.2</version>
</dependency>

3.yml配置

3.1 生产者yml 配置

rocketmq:name-server: 127.0.0.1:9876producer:group: my-group# 发送消息超时时间send-message-timeout: 5000# 发送消息失败重试次数retry-times-when-send-failed: 2retry-times-when-send-async-failed: 3 # 异步发送消息失败重试次数,默认2

3.2 消费者yml 配置

rocketmq:name-server: 127.0.0.1:9876consumer:topic: topic_testgroup: consumer_my-group

4.生产者发送消息

4.1 一般消息

@Resourceprivate RocketMQTemplate rocketMQTemplate;/***  一般消息* Topic 与 Tag 都是业务上用来归类的标识,区别在于 Topic 是一级分类,而 Tag 可以理解为是二级分类。* 使用 Tag 可以实现对 Topic 中的消息进行过滤。* **/@GetMapping("/send")public String send(){rocketMQTemplate.convertAndSend("topic_test", "Hello, World!");rocketMQTemplate.convertAndSend("topic_test:tagB","Hello, World222--tagB");return "rocketMq普通消息发送完成";}

4.2 顺序消息

/** 支持消费者按照发送消息的先后顺序获取消息 */@GetMapping("/send/orderly")public String sendOrder(){//发送顺序消息,参数:topic,消息,hashkey,相同hashkey发送至同一个队列rocketMQTemplate.syncSendOrderly("topic_test:tagA", MessageBuilder.withPayload("消息编号" + 1).build(),"queue");rocketMQTemplate.syncSendOrderly("topic_test:tagA", MessageBuilder.withPayload("消息编号" + 2).build(),"queue");return "rocketMq顺序-消息发送成功";}

4.3 同步消息

@GetMapping("/send/sync")public String sendMsg() {String message = "我是同步消息:" + LocalDateTime.now();SendResult result = rocketMQTemplate.syncSend("topic_test:tagA", MessageBuilder.withPayload(message).build());log.info("同步-消息发送成功:" + LocalDateTime.now());return "rocketMq 同步-消息发送成功:" + result.getSendStatus();}

4.4 异步消息

/** 发送异步消息 */@GetMapping("/send/async")public String asyncSendMsg(){String message = "我是异步消息:" + LocalDateTime.now();rocketMQTemplate.asyncSend("topic_test:tagA",message,new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {log.info("发送成功 (后执行),SendStatus = {}",sendResult.getSendStatus());}@Overridepublic void onException(Throwable throwable) {log.info("发送失败 (后执行)");}});return "rocketMq 异步-消息发送成功:" + LocalDateTime.now();}

 4.5 单向消息:一般用来发送日志等不重要的消息

@GetMapping("/send/oneWay")public String sendOneWayMessage() {String message =  "我是单向消息:"+LocalDateTime.now();this.rocketMQTemplate.sendOneWay("topic_test:tagA", message);log.info("单向发送消息完成:message = {}", message);return "rocketMq 单向-消息发送成功:" + LocalDateTime.now();}

 

4.6 延时消息

/** 延时消息 */@GetMapping("/sendDelay")public String sendDelay(){String message = "我是延时消息:" + LocalDateTime.now();// 第四个参数为延时级别,分为1-18:1、5、10、30、1m、2m、3m、...10m、20m、30m、1h、2hrocketMQTemplate.syncSend("topic_test:tagC", MessageBuilder.withPayload(message).build(), 3000, 2);return "rocketMq延时-消息发送成功";}

4.7 事务消息

4.7.1 事务消息发送代码

/** 事务消息 */@GetMapping("/send/transaction/{id}")public void sendTransactionMessage(@PathVariable("id") Integer id){//发送事务消息:采用的是sendMessageInTransaction方法,返回结果为TransactionSendResult对象,该对象中包含了事务发送的状态、本地事务执行的状态等//参数一:topic;参数二:消息// 事务idString[] tags = {"tagA", "tagB", "tagC"};int i = id%3;String transactionId = UUID.randomUUID().toString();String message = "我是事务消息:" + LocalDateTime.now();TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction("topic_test:" + tags[i], MessageBuilder.withPayload(message).setHeader(RocketMQHeaders.TRANSACTION_ID,transactionId).build(),// 给本地事务的参数2);//发送状态String sendStatus = result.getSendStatus().name();//本地事务执行状态String localState = result.getLocalTransactionState().name();log.info("发送状态:"+sendStatus+";本地事务执行状态"+localState);}

4.7.2 继承 RocketMQLocalTransactionListener

@Slf4j
@RocketMQTransactionListener
public class MyTransactionListener implements RocketMQLocalTransactionListener {@Overridepublic RocketMQLocalTransactionState executeLocalTransaction(org.springframework.messaging.Message message, Object o) {MessageHeaders headers = message.getHeaders();//获取事务IDString transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);log.info("执行本地事务 ,transactionId is {}, orderId is {}",transactionId, message.getHeaders().get("rocketmq_TAGS"));try{//模拟网络波动Thread.sleep(3000);/**** 首先发送一个半消息(half message),这个消息不会立即投递给消费者;然后执行本地事务(比如数据库操作)。* 根据本地事务的执行结果,决定是提交(commit)还是回滚(rollback)这个消息。* 如果本地事务成功,消息会被提交并发送给消费者;* 如果失败,消息会被回滚,消费者不会接收到这个消息*/}catch (Exception e){return RocketMQLocalTransactionState.ROLLBACK;}// 执行本地事务String tag = String.valueOf(message.getHeaders().get("rocketmq_TAGS"));if (StringUtils.equals("tagA", tag)){//这里只讲TAGA消息提交,状态为可执行return RocketMQLocalTransactionState.COMMIT;}else if (StringUtils.equals("tagB", tag)) {return RocketMQLocalTransactionState.ROLLBACK;} else if (StringUtils.equals("tagC",tag)) {return RocketMQLocalTransactionState.UNKNOWN;}log.info("事务提交,消息正常处理: " + LocalDateTime.now());//执行成功,可以提交事务return RocketMQLocalTransactionState.COMMIT;}@Overridepublic RocketMQLocalTransactionState checkLocalTransaction(org.springframework.messaging.Message message) {MessageHeaders headers = message.getHeaders();//获取事务IDString transactionId = (String) headers.get(RocketMQHeaders.TRANSACTION_ID);log.info(transactionId + ",消息回查"+ LocalDateTime.now());return RocketMQLocalTransactionState.ROLLBACK;}
}

tagA、tagB、tagC 三种事务消息,只有Commit的才能发送到broker 

 

 5. 消费端

/*** topic指定消费的主题,consumerGroup指定消费组,* 一个主题可以有多个消费者组,一个消息可以被多个不同的组的消费者都消费*  2.实现RocketMQListener接口*  如果想拿到消息的其他参数可以写成MessageExt*  selectorExpression = "tagA || tagB" 指定tag 的消费*/
@Service
@Slf4j
@RocketMQMessageListener(topic = "${rocketmq.consumer.topic}", consumerGroup = "${rocketmq.consumer.group}")
public class RocketMqConsumer implements RocketMQListener<String>{@Overridepublic void onMessage(String s) {log.info("topic_test: 所有的收到消息:"+s);}}

6.广播消费模式

生产端是一样的,但是消费端需要增加一个参数

messageModel:设置消费模式,取值范围CLUSTERING(集群消费)、BROADCASTING(广播消费)
@Service
@Slf4j
@RocketMQMessageListener(topic = "${rocketmq.consumer.topic}", consumerGroup = "${rocketmq.consumer.group}", messageModel = MessageModel.BROADCASTING)
public class RocketMqConsumer implements RocketMQListener<String>{@Overridepublic void onMessage(String s) {log.info("consumer2---topic_test: 所有的收到消息:"+s);}}// 第2个消费者类,他们都是一样的代码,
//为了表示广播,就是一个消息,会被这两个消费者消费@Service
@Slf4j
@RocketMQMessageListener(topic = "${rocketmq.consumer.topic}", consumerGroup = "${rocketmq.consumer.group}", messageModel = MessageModel.BROADCASTING)
public class RocketMqConsumer implements RocketMQListener<String>{@Overridepublic void onMessage(String s) {log.info("consumer1--topic_test: 所有的收到消息:"+s);}}

7.其他

RocketMQ 通过消费者组(Consumer Group)来维护不同消费者的消费进度。每个消费者组都有一个消费进度(offset),用于标记该组下的消费者在某个主题(Topic)和队列(Queue)上已经消费到的位置。所以:不同的消费者组会被视为不同的消费者

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/610863.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

48-基于腾讯云EKS的容器化部署实战

准备工作 在部署IAM应用之前&#xff0c;我们需要做以下准备工作&#xff1a; 开通腾讯云容器服务镜像仓库。安装并配置Docker。准备一个Kubernetes集群。 开通腾讯云容器服务镜像仓库 在Kubernetes集群中部署IAM应用&#xff0c;需要从镜像仓库下载指定的IAM镜像&#xff…

浅谈网络安全威胁与防御策略

企业网络安全威胁概述 外部威胁&#xff1a;来自网络安全威胁&#xff0c;比如DDOS攻击&#xff0c;病毒&#xff0c;sql注入&#xff0c;木马&#xff0c;蠕虫&#xff0c;等网络入侵&#xff0c;网络扫描&#xff0c;垃圾邮件&#xff0c;钓鱼邮件&#xff0c;针对web的攻击…

第24次修改了可删除可持久保存的前端html备忘录:文本编辑框不再隐藏,又增加了哔哩哔哩搜索和必应搜索

第24次修改了可删除可持久保存的前端html备忘录:文本编辑框不再隐藏&#xff0c;又增加了哔哩哔哩搜索和必应搜索. <!DOCTYPE html> <html lang"zh"><head><meta charset"UTF-8"><meta name"viewport" content"…

ssm+vue的实验室课程管理系统(有报告)。Javaee项目,ssm vue前后端分离项目。

演示视频&#xff1a; ssmvue的实验室课程管理系统&#xff08;有报告&#xff09;。Javaee项目&#xff0c;ssm vue前后端分离项目。 项目介绍&#xff1a; 采用M&#xff08;model&#xff09;V&#xff08;view&#xff09;C&#xff08;controller&#xff09;三层体系结构…

vue中预览docx、xlsx、pptx、pdf

前言&#xff1a;其实本来是要做全类型文件预览的&#xff0c;但是一直找不到合适的doc,xlx,ppt预览插件。要是有可以使用的&#xff0c;可以评论推荐给我 我使用的node版本&#xff1a;v18.19.1 参考官网&#xff1a;preview 文件预览 | ran 引入方式&#xff1a; //安装组…

C++设计模式:享元模式(十一)

1、定义与动机 概述&#xff1a;享元模式和单例模式一样&#xff0c;都是为了解决程序的性能问题。面向对象很好地解决了"抽象"的问题&#xff0c;但是必不可免得要付出一定的代价。对于通常情况来讲&#xff0c;面向对象的成本大豆可以忽略不计。但是某些情况&#…

简单了解JVM

一.JVM简介 jvm及Java virtual machineJava虚拟机&#xff0c;它是一个虚构出来的计算机&#xff0c;一种规范。其实抛开这么专业的句子不说&#xff0c;就知道 JVM 其实就类似于一台小电脑运行在 windows 或者 linux 这些操作系统环境下即可。它直接和操作系统进行交互&#…

基于FPGA轻松玩转AI

启动人工智能应用从来没有像现在这样容易&#xff01;受益于像Xilinx Zynq UltraScale MPSoC 这样的FPGA&#xff0c;AI现在也可以离线使用或在边缘部署、使用.可用于开发和部署用于实时推理的机器学习应用&#xff0c;因此将AI集成到应用中变得轻而易举。图像检测或分类、模式…

产品推荐 | iWare基于Xilinx Zynq 7000系列 SODIMM SOM 开发套件

01 产品概述 iWave的Zyng 7000 SoC开发套件包含Xilinx的Z7020基于SoC的SODIMMSOM和PicoITX外形尺寸载体卡。SOM配备512MBDDR3RAM和8GBeMMC闪存支持&#xff0c;千兆以太网PHY和内置的802.11n适用于PS的Wi-Fi/BLE4.0Zynq 7020开发套件载板支持多种板上连接器&#xff0c;可通过…

Hive的简单学习二

一Hive 库的基本操作 1.1 建库 1.默认路径是/user/hive/warehouse 例如 我输入命令 create database text1 则text1出现在 warehouse目录下 2.指定位置创建数据库 create database text2 location /bigdata29/bigdata29db 后面的路径是hdfs的路径 3.最终写法 加上if n…

React状态管理比较原理

一、React状态管理库 按照23年下载使用顺序依次是&#xff1a; ReduxZustandMobXRecoilJotaiValtio 二、各状态管理库简要概述 Redux&#xff1a;Redux 是一个行业标准的状态管理库&#xff0c;它利用 flux 架构来创建不可变的数据存储。 优点 提供可预测的、一致的状态…

python毕业设计django游泳馆管理系统-flask

游泳馆管理系统具有信息管理功能的选择。游泳馆管理系统采用python技术&#xff0c;基于mysql开发&#xff0c;实现了首页&#xff0c;教练信息&#xff0c;培训信息&#xff0c;交流版块&#xff0c;活动公告&#xff0c;个人中心&#xff0c;后台管理等内容进行管理&#xff…