前置知识:
RocketMQ学习笔记(1)—— 基础使用-CSDN博客
7.集成SpringBoot
以上所述功能均是通过RocketMQ的原生API实现的,除此之外SpringBoot对于一些功能进行了封装,使用更加方便
7.1 producer
依赖
<!-- rocketmq的依赖 -->
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.0.2</version>
</dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.25</version>
</dependency>
配置文件
rocketmq:name-server: hadoop104:9876producer:group: boot-producer-group
配置了name server的ip以及生产者组的名称
消息发送
常用函数如下:
- 发送同步消息:
syncSend(topic,msg)
- 发送异步消息:
asyncSend(topic,msg,callback)
- 发送单向消息:
sendOneWay(topic,msg)
- 发送延迟消息:
syncSend(topic,msg,timeout,delayLevel)
(这里的timeout与延迟时间无关,是消息发送的超时时间) - 发送顺序消息:
syncSendOrderly(topic,msg,hashKey)
(将hashKey相同的消息放入同一个队列中去) - 发送带tag的消息:
syncSend(topic:tag,msg)
- 发送带key的消息:
MessageBuilder.withPayload(msg).setHeader(RocketMQHeaders.KEYS, key).build();
(key是写在消息头中的)
代码:
@Test
void bootProducer() {//1.同步rocketMQTemplate.syncSend("bootTestTopic","我是一条消息");//2.异步rocketMQTemplate.asyncSend("bootAsyncTopic", "我是一条异步消息", new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("消息发送成功");}@Overridepublic void onException(Throwable e) {System.out.println("消息发送异常");}});//3.单向rocketMQTemplate.sendOneWay("bootOnewayTopic","我是一条单向消息");//4.延迟消息Message<String> msgDelay = MessageBuilder.withPayload("我是一条延迟消息").build();rocketMQTemplate.syncSend("bootMsTopic",msgDelay,3000,2);//5.顺序消息List<MsgModel> msgModels = Arrays.asList(new MsgModel("qwer", 1, "下单"),new MsgModel("qwer", 1, "短信"),new MsgModel("qwer", 1, "物流"),new MsgModel("zxcv", 2, "下单"),new MsgModel("zxcv", 2, "短信"),new MsgModel("zxcv", 2, "物流"));//发送时一般以json格式进行处理msgModels.forEach(msgModel -> {rocketMQTemplate.syncSendOrderly("bootOrderlyTopic", JSON.toJSONString(msgModel),msgModel.getOrderSn());});//6.带tag的消息rocketMQTemplate.syncSend("bootTgTopic:tagA","我是tagA的消息");//7.带key的消息Message<String> msgWithKey = MessageBuilder.withPayload("我是带key的消息").setHeader(RocketMQHeaders.KEYS, "thisIsAKey").build();rocketMQTemplate.syncSend("bootKeyTopic",msgWithKey);}
7.2 consumer
依赖
<!-- rocketmq的依赖 -->
<dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.0.2</version>
</dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>2.0.25</version>
</dependency>
配置文件
server:port: 8081
rocketmq:name-server: hadoop104:9876
设置springboot的端口号以及name server的ip
代码
基础结构
@Component
@RocketMQMessageListener(topic = "bootTestTopic",consumerGroup = "boot-test-consumer-group"
)
public class ABootSimpleListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {System.out.println(Arrays.toString(messageExt.getBody()));}
}
1.监听器的使用:
通过注册监听器的形式监听topic中的内容并进行消费,主要是通过继承RocketMQListener
类,实现onMessage
方法来完成的;
RocketMQListener
的泛型如果指定了固定的类型,那么onMessage
方法中的参数类型与泛型指定的类型一致;MessageExt
类型代表了消息的全部内容
如果消息消费过程中没有报错,则签收;如果报错了,则拒收并重试
2.注解的使用
通过@RocketMQMessageListener
注解来为监听器设置一些属性:
如topic,设置订阅的主题;consumerGroup,设置消费者组
接下来发送一条简单的消息:rocketMQTemplate.syncSend("bootTestTopic","我是一条消息");
然后开启consumer监听器的springboot Application,即可进行消费:
顺序消息接收
@Component
@RocketMQMessageListener(topic = "bootOrderlyTopic",consumerGroup = "boot-orderly-consumer-group",consumeMode = ConsumeMode.ORDERLY, //顺序消费模式 单线程maxReconsumeTimes = 5 //消费重试的次数
)
public class BOrderlyListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {MsgModel msgModel = JSON.parseObject(new String(messageExt.getBody()), MsgModel.class);System.out.println(msgModel.toString());}
}
需要在注解中添加consumeMode = ConsumeMode.ORDERLY
,表示以单线程模式进行消费,避免因为并发导致的顺序错误
运行结果如下:
带tag的消息接收
@Component
@RocketMQMessageListener(topic = "bootTagTopic",consumerGroup = "boot-tag-consumer-group",selectorType = SelectorType.TAG,selectorExpression = "tagA"
)
public class CTagListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {System.out.println(new String(messageExt.getBody()));}
}
需要通过selectorType
指定过滤模式,默认使用tag进行过滤,还可以选择SQL92
模式,但一般不用;
通过selectorExpression
来设置用于过滤的表达式,默认是*
,如果需要选取多个tag使用||
分割即可,如tagA || tagB
运行结果如下:
7.3 两种消费模式
Rocketmq消息消费的模式分为两种:负载均衡模式和广播模式
- 负载均衡模式表示多个消费者交替消费同一个主题里面的消息
- 广播模式表示每个每个消费者都消费一遍订阅的主题的消息
负载均衡模式
需要在注解中设置:messageModel = MessageModel.CLUSTERING
会将消息均匀地发送到各个队列中去
示例代码:
生产者:
@Testvoid modeTest() throws Exception{for (int i = 1; i <= 30; i++) {rocketMQTemplate.syncSend("bootModeTopic", ("我是第" + i + "个消息").getBytes());}}
消费者:
@Component
@RocketMQMessageListener(topic = "bootModeTopic",consumerGroup = "boot-mode-consumer-group-cluster",messageModel = MessageModel.CLUSTERING //集群模式 负载均衡
)
public class cluster1 implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {System.out.println("我是cluster组的第一个消费者,消息内容是:" + new String(messageExt.getBody()));}
}
一共启动3个消费者,处理逻辑完全相同;
运行结果如下:
我是cluster组的第二个消费者,消息内容是:我是第1个消息
我是cluster组的第一个消费者,消息内容是:我是第2个消息
我是cluster组的第一个消费者,消息内容是:我是第3个消息
我是cluster组的第三个消费者,消息内容是:我是第4个消息
我是cluster组的第二个消费者,消息内容是:我是第5个消息
我是cluster组的第一个消费者,消息内容是:我是第6个消息
我是cluster组的第一个消费者,消息内容是:我是第7个消息
我是cluster组的第三个消费者,消息内容是:我是第8个消息
我是cluster组的第二个消费者,消息内容是:我是第9个消息
我是cluster组的第一个消费者,消息内容是:我是第10个消息
我是cluster组的第一个消费者,消息内容是:我是第11个消息
我是cluster组的第三个消费者,消息内容是:我是第12个消息
我是cluster组的第二个消费者,消息内容是:我是第13个消息
我是cluster组的第一个消费者,消息内容是:我是第14个消息
我是cluster组的第一个消费者,消息内容是:我是第15个消息
我是cluster组的第三个消费者,消息内容是:我是第16个消息
我是cluster组的第二个消费者,消息内容是:我是第17个消息
我是cluster组的第一个消费者,消息内容是:我是第18个消息
我是cluster组的第一个消费者,消息内容是:我是第19个消息
我是cluster组的第三个消费者,消息内容是:我是第20个消息
我是cluster组的第二个消费者,消息内容是:我是第21个消息
我是cluster组的第一个消费者,消息内容是:我是第22个消息
我是cluster组的第一个消费者,消息内容是:我是第23个消息
我是cluster组的第一个消费者,消息内容是:我是第26个消息
我是cluster组的第三个消费者,消息内容是:我是第24个消息
我是cluster组的第二个消费者,消息内容是:我是第25个消息
我是cluster组的第一个消费者,消息内容是:我是第27个消息
我是cluster组的第三个消费者,消息内容是:我是第28个消息
我是cluster组的第二个消费者,消息内容是:我是第29个消息
我是cluster组的第一个消费者,消息内容是:我是第30个消息
从面板中可以看到,消息均衡地发送到各个消息队列中去:
根据下图所示消费方式:
显然有一个消费者订阅了两个队列中地数据,剩下的两个消费者各订阅一个队列中的数据;
广播模式
需要在注解中设置:messageModel = MessageModel.BROADCASTING
示例代码如下:
生产者:
//广播模式@Testvoid modeTest2() throws Exception{for (int i = 1; i <= 5; i++) {rocketMQTemplate.syncSend("bootModeTopic", ("我是第" + i + "个消息").getBytes());}}
消费者:
@Component
@RocketMQMessageListener(topic = "bootModeTopic",consumerGroup = "boot-mode-consumer-group-broadcast",messageModel = MessageModel.BROADCASTING //集群模式 负载均衡
)
public class broadcast1 implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {System.out.println("我是broadcast组的第一个消费者,消息内容是:" + new String(messageExt.getBody()));}
}
一共启动3个消费者,处理逻辑完全相同;
7.4 消息堆积问题
一般认为单条队列消息差值>=10w时 出现消息堆积问题
问题出现原因
1.生产太快了 ,解决方法:
- 生产方可以做业务限流
- 增加消费者数量,但是
消费者数量<=队列数量
,也可以适当的设置最大的消费线程数量(根据IO密集型(2n)/CPU密集型(n+1)) - 动态扩容队列数量,从而增加消费者数量
2.消费者消费出现问题:排查消费者程序的问题
示例代码
可以通过consumeThreadNumber = 40
来设置消费者线程的数量
上述的n即为逻辑处理器的数量:
消费者代码如下:
@Component
@RocketMQMessageListener(topic = "jyTopic",consumerGroup = "jy-consumer-group",consumeThreadNumber = 40,consumeMode = ConsumeMode.CONCURRENTLY
)
public class EJyListener2 implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("我是第二个消费者:" + message);}
}
7.5 消息丢失问题
消息在消费过程中可能会丢失,解决方案如下:
①记录消息状态
具体思路:
- 生产者使用同步发送模式,收到mq的返回值之后向MySQL数据库中写入一条记录,包括:key(唯一标识),createTime(消息生成时间),status=1(消息状态)
- 消费者消费之后,根据相应的key找到这条记录,更新消息的状态,设置status=2
- 设置定时任务,每天执行一次,查询数据库中
createrTime > 1 day and status = 1
的数据,进行补发即可
②设置mq的刷盘机制为同步刷盘
采用同步刷盘可以避免消息在缓冲区(buffer)丢失
③开启trace机制(消息追踪)
- 停止运行broker
- 修改
broker.conf
配置文件,添加配置:traceTopicEnable=true
- 启动broker即可
代码
生产者:
//轨迹追踪@Testvoid traceTest() throws Exception{rocketMQTemplate.syncSend("bootTraceTest","我是一条可追踪轨迹的消息");}
需要在application.yml
文件中配置开启消息轨迹:
消费者:需要设置:enableMsgTrace = true
,开启消费者方的轨迹
@Component
@RocketMQMessageListener(topic = "bootTraceTest",consumerGroup = "boot-trace-consumer-group",consumeMode = ConsumeMode.CONCURRENTLY,enableMsgTrace = true // 开启消费者方的轨迹
)
public class DTraceTest implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {System.out.println(new String(messageExt.getBody()));}
}
消息轨迹详情
可以通过topic和messageId来查看消息轨迹:
默认会将消息轨迹的数据存在RMQ_SYS_TRACE_TOPIC
主题里面:
7.6 安全机制
配置文件
1.开启acl的控制 在broker.conf
中开启aclEnable=true
2.配置账号密码 修改plain_acl.yml
并修改远程连接ip为:whiteRemoteAddress: 192.168.*.*
3.修改控制面板的配置文件,放开52/53行并把49行改为true:
然后上传到服务器的jar包平级目录下即可:
之后重启broker和dashboard即可
注意:启动dashboard时一定要在该jar包目录下,这样才能读取到平级目录下的application.properties配置文件:nohup java -jar rocketmq-dashboard-1.0.0.jar > /opt/module/rocketmq-4.9.2/logs/dashboard.log &
否则会报错:
具体使用
代码中
需要在application.yml
配置文件中设置:
生产者:
消费者:
控制面板
需要输入用户密码才能登录
默认是admin/admin