RocketMQ学习笔记(2)—— 集成SpringBoot

前置知识:

RocketMQ学习笔记(1)—— 基础使用-CSDN博客

7.集成SpringBoot

以上所述功能均是通过RocketMQ的原生API实现的,除此之外SpringBoot对于一些功能进行了封装,使用更加方便

7.1 producer

依赖

<!-- rocketmq的依赖 -->
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.0.2</version>
</dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.25</version>
</dependency>

配置文件

rocketmq:name-server: hadoop104:9876producer:group: boot-producer-group    

配置了name server的ip以及生产者组的名称

消息发送

常用函数如下:

  1. 发送同步消息:syncSend(topic,msg)
  2. 发送异步消息:asyncSend(topic,msg,callback)
  3. 发送单向消息:sendOneWay(topic,msg)
  4. 发送延迟消息:syncSend(topic,msg,timeout,delayLevel)(这里的timeout与延迟时间无关,是消息发送的超时时间)
  5. 发送顺序消息:syncSendOrderly(topic,msg,hashKey)(将hashKey相同的消息放入同一个队列中去)
  6. 发送带tag的消息:syncSend(topic:tag,msg)
  7. 发送带key的消息:MessageBuilder.withPayload(msg).setHeader(RocketMQHeaders.KEYS, key).build();(key是写在消息头中的)

代码:

@Test
void bootProducer() {//1.同步rocketMQTemplate.syncSend("bootTestTopic","我是一条消息");//2.异步rocketMQTemplate.asyncSend("bootAsyncTopic", "我是一条异步消息", new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("消息发送成功");}@Overridepublic void onException(Throwable e) {System.out.println("消息发送异常");}});//3.单向rocketMQTemplate.sendOneWay("bootOnewayTopic","我是一条单向消息");//4.延迟消息Message<String> msgDelay = MessageBuilder.withPayload("我是一条延迟消息").build();rocketMQTemplate.syncSend("bootMsTopic",msgDelay,3000,2);//5.顺序消息List<MsgModel> msgModels = Arrays.asList(new MsgModel("qwer", 1, "下单"),new MsgModel("qwer", 1, "短信"),new MsgModel("qwer", 1, "物流"),new MsgModel("zxcv", 2, "下单"),new MsgModel("zxcv", 2, "短信"),new MsgModel("zxcv", 2, "物流"));//发送时一般以json格式进行处理msgModels.forEach(msgModel -> {rocketMQTemplate.syncSendOrderly("bootOrderlyTopic", JSON.toJSONString(msgModel),msgModel.getOrderSn());});//6.带tag的消息rocketMQTemplate.syncSend("bootTgTopic:tagA","我是tagA的消息");//7.带key的消息Message<String> msgWithKey = MessageBuilder.withPayload("我是带key的消息").setHeader(RocketMQHeaders.KEYS, "thisIsAKey").build();rocketMQTemplate.syncSend("bootKeyTopic",msgWithKey);}

7.2 consumer

依赖

<!-- rocketmq的依赖 -->
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.0.2</version>
</dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.25</version>
</dependency>

配置文件

server:port: 8081
rocketmq:name-server: hadoop104:9876

设置springboot的端口号以及name server的ip

代码

基础结构
@Component
@RocketMQMessageListener(topic = "bootTestTopic",consumerGroup = "boot-test-consumer-group"
)
public class ABootSimpleListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {System.out.println(Arrays.toString(messageExt.getBody()));}
}

1.监听器的使用:

通过注册监听器的形式监听topic中的内容并进行消费,主要是通过继承RocketMQListener类,实现onMessage方法来完成的;

RocketMQListener的泛型如果指定了固定的类型,那么onMessage方法中的参数类型与泛型指定的类型一致;MessageExt类型代表了消息的全部内容

如果消息消费过程中没有报错,则签收;如果报错了,则拒收并重试

2.注解的使用

通过@RocketMQMessageListener注解来为监听器设置一些属性:

如topic,设置订阅的主题;consumerGroup,设置消费者组

接下来发送一条简单的消息:rocketMQTemplate.syncSend("bootTestTopic","我是一条消息");

然后开启consumer监听器的springboot Application,即可进行消费:

顺序消息接收
@Component
@RocketMQMessageListener(topic = "bootOrderlyTopic",consumerGroup = "boot-orderly-consumer-group",consumeMode = ConsumeMode.ORDERLY, //顺序消费模式 单线程maxReconsumeTimes = 5 //消费重试的次数
)
public class BOrderlyListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {MsgModel msgModel = JSON.parseObject(new String(messageExt.getBody()), MsgModel.class);System.out.println(msgModel.toString());}
}

需要在注解中添加consumeMode = ConsumeMode.ORDERLY,表示以单线程模式进行消费,避免因为并发导致的顺序错误

运行结果如下:

带tag的消息接收
@Component
@RocketMQMessageListener(topic = "bootTagTopic",consumerGroup = "boot-tag-consumer-group",selectorType = SelectorType.TAG,selectorExpression = "tagA"
)
public class CTagListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {System.out.println(new String(messageExt.getBody()));}
}

需要通过selectorType指定过滤模式,默认使用tag进行过滤,还可以选择SQL92模式,但一般不用;

通过selectorExpression来设置用于过滤的表达式,默认是*,如果需要选取多个tag使用||分割即可,如tagA || tagB

运行结果如下:

7.3 两种消费模式

Rocketmq消息消费的模式分为两种:负载均衡模式和广播模式

  • 负载均衡模式表示多个消费者交替消费同一个主题里面的消息
  • 广播模式表示每个每个消费者都消费一遍订阅的主题的消息

负载均衡模式

需要在注解中设置:messageModel = MessageModel.CLUSTERING

会将消息均匀地发送到各个队列中去

示例代码:

生产者:

@Testvoid modeTest() throws Exception{for (int i = 1; i <= 30; i++) {rocketMQTemplate.syncSend("bootModeTopic", ("我是第" + i + "个消息").getBytes());}}

消费者:

@Component
@RocketMQMessageListener(topic = "bootModeTopic",consumerGroup = "boot-mode-consumer-group-cluster",messageModel = MessageModel.CLUSTERING //集群模式 负载均衡
)
public class cluster1 implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {System.out.println("我是cluster组的第一个消费者,消息内容是:" + new String(messageExt.getBody()));}
}

一共启动3个消费者,处理逻辑完全相同;

运行结果如下:

我是cluster组的第二个消费者,消息内容是:我是第1个消息
我是cluster组的第一个消费者,消息内容是:我是第2个消息
我是cluster组的第一个消费者,消息内容是:我是第3个消息
我是cluster组的第三个消费者,消息内容是:我是第4个消息
我是cluster组的第二个消费者,消息内容是:我是第5个消息
我是cluster组的第一个消费者,消息内容是:我是第6个消息
我是cluster组的第一个消费者,消息内容是:我是第7个消息
我是cluster组的第三个消费者,消息内容是:我是第8个消息
我是cluster组的第二个消费者,消息内容是:我是第9个消息
我是cluster组的第一个消费者,消息内容是:我是第10个消息
我是cluster组的第一个消费者,消息内容是:我是第11个消息
我是cluster组的第三个消费者,消息内容是:我是第12个消息
我是cluster组的第二个消费者,消息内容是:我是第13个消息
我是cluster组的第一个消费者,消息内容是:我是第14个消息
我是cluster组的第一个消费者,消息内容是:我是第15个消息
我是cluster组的第三个消费者,消息内容是:我是第16个消息
我是cluster组的第二个消费者,消息内容是:我是第17个消息
我是cluster组的第一个消费者,消息内容是:我是第18个消息
我是cluster组的第一个消费者,消息内容是:我是第19个消息
我是cluster组的第三个消费者,消息内容是:我是第20个消息
我是cluster组的第二个消费者,消息内容是:我是第21个消息
我是cluster组的第一个消费者,消息内容是:我是第22个消息
我是cluster组的第一个消费者,消息内容是:我是第23个消息
我是cluster组的第一个消费者,消息内容是:我是第26个消息
我是cluster组的第三个消费者,消息内容是:我是第24个消息
我是cluster组的第二个消费者,消息内容是:我是第25个消息
我是cluster组的第一个消费者,消息内容是:我是第27个消息
我是cluster组的第三个消费者,消息内容是:我是第28个消息
我是cluster组的第二个消费者,消息内容是:我是第29个消息
我是cluster组的第一个消费者,消息内容是:我是第30个消息

从面板中可以看到,消息均衡地发送到各个消息队列中去:

根据下图所示消费方式:

显然有一个消费者订阅了两个队列中地数据,剩下的两个消费者各订阅一个队列中的数据;

广播模式

需要在注解中设置:messageModel = MessageModel.BROADCASTING

示例代码如下:

生产者:

    //广播模式@Testvoid modeTest2() throws Exception{for (int i = 1; i <= 5; i++) {rocketMQTemplate.syncSend("bootModeTopic", ("我是第" + i + "个消息").getBytes());}}

消费者:

@Component
@RocketMQMessageListener(topic = "bootModeTopic",consumerGroup = "boot-mode-consumer-group-broadcast",messageModel = MessageModel.BROADCASTING //集群模式 负载均衡
)
public class broadcast1 implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {System.out.println("我是broadcast组的第一个消费者,消息内容是:" + new String(messageExt.getBody()));}
}

一共启动3个消费者,处理逻辑完全相同;

7.4 消息堆积问题

一般认为单条队列消息差值>=10w时 出现消息堆积问题

问题出现原因

1.生产太快了 ,解决方法:

  • 生产方可以做业务限流
  • 增加消费者数量,但是消费者数量<=队列数量,也可以适当的设置最大的消费线程数量(根据IO密集型(2n)/CPU密集型(n+1))
  • 动态扩容队列数量,从而增加消费者数量

2.消费者消费出现问题:排查消费者程序的问题

示例代码

可以通过consumeThreadNumber = 40来设置消费者线程的数量

上述的n即为逻辑处理器的数量:

消费者代码如下:

@Component
@RocketMQMessageListener(topic = "jyTopic",consumerGroup = "jy-consumer-group",consumeThreadNumber = 40,consumeMode = ConsumeMode.CONCURRENTLY
)
public class EJyListener2 implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("我是第二个消费者:" + message);}
}

7.5 消息丢失问题

消息在消费过程中可能会丢失,解决方案如下:

①记录消息状态

具体思路:

  1. 生产者使用同步发送模式,收到mq的返回值之后向MySQL数据库中写入一条记录,包括:key(唯一标识),createTime(消息生成时间),status=1(消息状态)
  2. 消费者消费之后,根据相应的key找到这条记录,更新消息的状态,设置status=2
  3. 设置定时任务,每天执行一次,查询数据库中createrTime > 1 day and status = 1的数据,进行补发即可

②设置mq的刷盘机制为同步刷盘

采用同步刷盘可以避免消息在缓冲区(buffer)丢失

③开启trace机制(消息追踪)

  1. 停止运行broker
  2. 修改broker.conf配置文件,添加配置:traceTopicEnable=true
  3. 启动broker即可

代码

生产者:

    //轨迹追踪@Testvoid traceTest() throws Exception{rocketMQTemplate.syncSend("bootTraceTest","我是一条可追踪轨迹的消息");}

需要在application.yml文件中配置开启消息轨迹:

 

消费者:需要设置:enableMsgTrace = true,开启消费者方的轨迹

@Component
@RocketMQMessageListener(topic = "bootTraceTest",consumerGroup = "boot-trace-consumer-group",consumeMode = ConsumeMode.CONCURRENTLY,enableMsgTrace = true // 开启消费者方的轨迹
)
public class DTraceTest implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {System.out.println(new String(messageExt.getBody()));}
}
消息轨迹详情

可以通过topic和messageId来查看消息轨迹:

默认会将消息轨迹的数据存在RMQ_SYS_TRACE_TOPIC主题里面:

7.6 安全机制

配置文件

1.开启acl的控制 在broker.conf中开启aclEnable=true

2.配置账号密码 修改plain_acl.yml

并修改远程连接ip为:whiteRemoteAddress: 192.168.*.*

3.修改控制面板的配置文件,放开52/53行并把49行改为true:

然后上传到服务器的jar包平级目录下即可:

之后重启broker和dashboard即可

注意:启动dashboard时一定要在该jar包目录下,这样才能读取到平级目录下的application.properties配置文件:nohup java -jar rocketmq-dashboard-1.0.0.jar > /opt/module/rocketmq-4.9.2/logs/dashboard.log &

否则会报错:

具体使用

代码中

需要在application.yml配置文件中设置:

生产者:

消费者:

控制面板

需要输入用户密码才能登录

默认是admin/admin

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/495639.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

python脚本调用bitcoin-cli接口命令

脚本需求 1、python一个对外接口 2、不同的bitcoin命令通过传不同的参数实现 3、接口及接口的参数依次往后传递 4、日志全部打印到日志文件中并且日志文件按天进行切割 #!/usr/bin/python3from flask import Flask, request, jsonify import subprocess import json import os …

死区过滤器Deadband和DeadZone区别(应用介绍)

死区过滤器的算法和详细介绍专栏也有介绍,这里我们主要对这两个模块的区别和应用场景进行详细介绍 1、死区过滤器 https://rxxw-control.blog.csdn.net/article/details/128521007https://rxxw-control.blog.csdn.net/article/details/128521007 1、Deadband和DeadZone区别…

2024年新提出的算法|鹦鹉优化器(Parrot optimizer):算法及其在医疗问题中的应用

本期介绍一种基于训练后鹦鹉关键行为的高效优化方法——鹦鹉优化器(Parrot Optimizer, PO)。该成果于2024年2月发表在中科院2区top SCI期刊Computers in Biology and Medicine&#xff08;IF7.7&#xff09; 1、简介 鹦鹉优化器&#xff08;PO&#xff09;是一种受训练有素的…

深入理解分库、分表、分库分表

前言 分库分表&#xff0c;是企业里面比较常见的针对高并发、数据量大的场景下的一种技术优化方案&#xff0c;所谓"分库分表"&#xff0c;根本就不是一件事儿&#xff0c;而是三件事儿&#xff0c;他们要解决的问题也都不一样&#xff0c;这三个事儿分别是"只…

LeetCode142. 环形链表 II刷题详解

今天力扣刷到了一个特别有意思的题目&#xff0c;于是就写了下面的题解来加深以下理解。 142. 环形链表 II - 力扣&#xff08;LeetCode&#xff09; 这个可以分为两大步去写&#xff0c;首先要判断链表是否有环&#xff0c;然后如果有环就去找到环的入口&#xff0c;没有环返…

Ubuntu服务器fail2ban的使用

作用&#xff1a;限制ssh远程登录&#xff0c;防止被人爆破服务器&#xff0c;封禁登录ip 使用lastb命令可查看到登录失败的用户及ip&#xff0c;无时无刻的不在爆破服务器 目录 一、安装fail2ban 二&#xff0c;配置fail2ban封禁ip的规则 1&#xff0c;进入目录并创建ssh…

【Azure 架构师学习笔记】-Azure Synapse -- Link for SQL 实时数据加载

本文属于【Azure 架构师学习笔记】系列。 本文属于【Azure Synapse】系列。 前言 Azure Synapse Link for SQL 可以提供从SQL Server或者Azure SQL中接近实时的数据加载。通过这个技术&#xff0c;使用SQL Server/Azure SQL中的新数据能够几乎实时地传送到Synapse&#xff08;…

HTML5 CSS3 提高

一&#xff0c;HTML5的新特性 这些新特性都有兼容性问题&#xff0c;基本是IE9以上版本的浏览器才支持&#xff0c;如果不考虑兼容性问题&#xff0c;可以大量使用这些新特性。 1.1新增语义化标签 注意&#xff1a; 1这种语义化标签主要是针对搜索引擎的 2这些新标签在页面…

Tuning Language Models by Proxy

1、写作动机&#xff1a; 调整大语言模型已经变得越来越耗资源&#xff0c;或者在模型权重是私有的情况下是不可能的。作者引入了代理微调&#xff0c;这是一种轻量级的解码时算法&#xff0c;它在黑盒 大语言模型 之上运行&#xff0c;以达到直接微调模型的结果&#xff0c;但…

《C++进阶--9.继承》

目录 9. 继承 9.1 继承的基本语法 9.2 继承方式 9.3 继承中的对象模型 9.4 继承中构造和析构顺序 9.5 继承同名成员处理方式 9.6 继承同名静态成员处理方式 9.7 多继承语法 9.8 菱形继承 9. 继承 继承是面向对象三大特性之一 有些类与类之间存在特殊的关系&#xff0c…

使用ffmpeg压缩视频

一、到ffmpeg官网下载文件包&#xff1a; Download FFmpeg 下载后找到 bin 下的3个exe文件&#xff0c;复制到自己本机的某个目录下, 如&#xff1a; 二、使用命令行压缩&#xff1a; ffmpeg -i input.mp4 -c:v libx265 -crf 28 -y output.mp4 这条命令使用 FFmpeg 工具对输…

CSS3技巧37:JS+CSS3 制作旋转图片墙

开学了就好忙啊&#xff0c;Three.js 学习的进度很慢。。。 备课备课才是王道。 更一篇 JS CSS3 的内容&#xff0c;做一个图片墙。 其核心要点是把图片摆成这个样子&#xff1a; 看上去这个布局很复杂&#xff0c;其实很简单。其思路是&#xff1a; 所有图片放在一个 div.…