【SpringBoot】第二篇:RocketMq使用

背景:

本文会介绍多种案例,教大家如何使用rocketmq。

一般rocketmq使用在微服务项目中,属于分模块使用。这里使用springboot单体项目来模拟使用。

本文以windows系统来做案例。

下载rocketmq和启动:

RocketMQ 在 windows 上运行 - 知乎 (zhihu.com)icon-default.png?t=N6B9https://zhuanlan.zhihu.com/p/644944370 

一、创建springboot项目

一直next进行下去就可以了。

二、pom文件依赖

   <dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.0.4</version></dependency><!-- 还有其它需要的jar包自由引入(注:fastjson不要使用低于1.2.60版本,会有安全漏洞) --><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.62</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency></dependencies>

 三、代码实现

1.1目录结构

1.2生产者消费者解释

  • 生产者: 

 什么是生产者?就比喻个简单的例子。比如我们要新增用户,那么这个新增保存动作可以认为是生产者,他产生了数据,要将数据保存进数据库。

  • 消费者:

什么是消费者? 就比喻个简单的例子。用户在新增的时候他会调用接口,用于保存到数据库,那么处理这个数据的方法你可以理解为消费者。不过在mq中,生产者是将消息发送到mq服务队列中,会根据主题Topic的不同,发往不同的频道。而消费者只需要监听这个Topic主题即可。只要这个topic有消息来了,那么消费者就会进行消费。后面代码里有详细的注释告知大家如何使用生产者和消费者。

1.3application.yml配置文件

# Tomcat
server:tomcat:uri-encoding: UTF-8max-threads: 1000min-spare-threads: 30servlet:context-path: /port: 8090rocketmq:name-server: 127.0.0.1:9876 # 访问地址producer:group: Pro_Group # 必须指定groupsend-message-timeout: 3000 # 消息发送超时时长,默认3sretry-times-when-send-failed: 3 # 同步发送消息失败重试次数,默认2retry-times-when-send-async-failed: 3 # 异步发送消息失败重试次数,默认2

1.4生产者服务

import com.alibaba.fastjson.JSON;
import com.example.rocketmqdemo.model.User;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.util.CollectionUtils;import java.util.List;@Slf4j
@Component
public class MQProducerService {@Value("${rocketmq.producer.send-message-timeout}")private Integer messageTimeOut;// 建议正常规模项目统一用一个TOPICprivate static final String topic = "RLT_TEST_TOPIC";// 直接注入使用,用于发送消息到broker服务器@Autowiredprivate RocketMQTemplate rocketMQTemplate;/*** Tag:用于区分过滤同一主题下的不同业务类型的消息,非常实用* 普通发送(这里的参数对象User可以随意定义,可以发送个对象,也可以是字符串等)*/public void send(User user) {rocketMQTemplate.convertAndSend(topic + ":tag1", user);
//        rocketMQTemplate.send(topic + ":tag1", MessageBuilder.withPayload(user).build()); // 等价于上面一行}/*** 发送同步消息(阻塞当前线程,等待broker响应发送结果,这样不太容易丢失消息)* (msgBody也可以是对象,sendResult为返回的发送结果)*/public SendResult sendMsg(String msgBody) {SendResult sendResult = rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msgBody).build());log.info("【sendMsg】sendResult={}", JSON.toJSONString(sendResult));return sendResult;}/*** 发送异步消息(通过线程池执行发送到broker的消息任务,执行完后回调:在SendCallback中可处理相关成功失败时的逻辑)* (适合对响应时间敏感的业务场景)*/public void sendAsyncMsg(String msgBody) {rocketMQTemplate.asyncSend(topic, MessageBuilder.withPayload(msgBody).build(), new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {// 处理消息发送成功逻辑log.info("【sendMsg】sendResult={}", JSON.toJSONString(sendResult));}@Overridepublic void onException(Throwable throwable) {// 处理消息发送异常逻辑log.info("【sendMsg】sendResult={}", "发送异常" + throwable.getMessage());}});}/*** 发送延时消息(上面的发送同步消息,delayLevel的值就为0,因为不延时)* 在start版本中 延时消息一共分为18个等级分别为:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h*/public void sendDelayMsg(String msgBody, int delayLevel) {rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msgBody).build(), messageTimeOut, delayLevel);}/*** 发送单向消息(只负责发送消息,不等待应答,不关心发送结果,如日志)*/public void sendOneWayMsg(String msgBody) {rocketMQTemplate.sendOneWay(topic, MessageBuilder.withPayload(msgBody).build());}/*** 发送带tag的消息,直接在topic后面加上":tag"*/public SendResult sendTagMsg(String msgBody) {return rocketMQTemplate.syncSend(topic + ":tag2", MessageBuilder.withPayload(msgBody).build());}/**** 服务生产者,顺序消息* 把消息确保投递到同一条queue* 保证了消息的顺序性*/public void sendFIFOMsg(List<User> users) {//顺序消息//选择器规则构建rocketMQTemplate.setMessageQueueSelector((list, message, o) -> {int id = Integer.valueOf((String) o);int hash = (id % list.size());return list.get(hash);});if (!CollectionUtils.isEmpty(users)) {for (User user : users) {MessageBuilder.withPayload(users.toString()).build();rocketMQTemplate.sendOneWayOrderly(topic+":sendFIFOMsg", user, String.valueOf(user.getId()));}}}
}

1.5消费者服务

import com.alibaba.fastjson.JSON;
import com.example.rocketmqdemo.model.User;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.spring.annotation.ConsumeMode;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;@Slf4j
@Component
public class MQConsumerService {// Tag:用于区分过滤同一主题下的不同业务类型的消息,非常实用// topic需要和生产者的topic一致,consumerGroup属性是必须指定的,内容可以随意// selectorExpression的意思指的就是tag,默认为“*”,不设置的话会监听所有消息@Service@RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag1", consumerGroup = "Con_Group_One")public class ConsumerSend implements RocketMQListener<User> {// 监听到消息就会执行此方法@Overridepublic void onMessage(User user) {log.info("tag1监听到消息:user={}", JSON.toJSONString(user));}}// 注意:这个ConsumerSend2和上面ConsumerSend在没有添加tag做区分时,不能共存,// 不然生产者发送一条消息,这两个都会去消费,如果类型不同会有一个报错,所以实际运用中最好加上tag,写这只是让你看知道就行@Service@RocketMQMessageListener(topic = "RLT_TEST_TOPIC", consumerGroup = "Con_Group_Two",selectorExpression = "xxx")public class ConsumerSend2 implements RocketMQListener<String> {@Overridepublic void onMessage(String str) {log.info("ConsumerSend2监听到消息:str={}", str);}}// MessageExt:是一个消息接收通配符,不管发送的是String还是对象,都可接收,当然也可以像上面明确指定类型(我建议还是指定类型较方便)@Service@RocketMQMessageListener(topic = "RLT_TEST_TOPIC", selectorExpression = "tag2", consumerGroup = "Con_Group_Three")public class Consumer implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt messageExt) {byte[] body = messageExt.getBody();String msg = new String(body);log.info("tag2监听到消息:msg={}", msg);}}/*** 消费者顺序消费消息* 顺序消费*/@Service@RocketMQMessageListener(consumerGroup = "Orderly-Consumer", topic = "RLT_TEST_TOPIC",selectorExpression = "sendFIFOMsg", consumeMode = ConsumeMode.ORDERLY)public class OrderlyConsumer implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt message) {System.out.println("线程"+Thread.currentThread()+"内容为:"+ new String(message.getBody())+"队列序号:"+message.getQueueId()+",消息msgId:"+message.getMsgId());}}
}

1.6User实体类

import lombok.Data;@Data
public class User {private String id;private String name;private Integer age;private String sex;private String desc;}

1.7开始调用生产者服务、消费者自动监听消费

import com.example.rocketmqdemo.model.User;
import com.example.rocketmqdemo.producer.MQProducerService;
import org.apache.rocketmq.client.producer.SendResult;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.ArrayList;
import java.util.List;@RestController
@RequestMapping("/rocketmq")
public class MqController {@Autowiredprivate MQProducerService mqProducerService;@GetMapping("/send")public void send() {User user = new User();user.setAge(28);user.setName("曹震");user.setSex("男");mqProducerService.send(user);}@GetMapping("/sendTag")public ResponseEntity<SendResult> sendTag() {SendResult sendResult = mqProducerService.sendTagMsg("带有tag的字符消息");return ResponseEntity.ok(sendResult);}@GetMapping("/sendMsg")public ResponseEntity<SendResult> sendMsg() {SendResult sendResult = mqProducerService.sendMsg("曹震测试");return ResponseEntity.ok(sendResult);}@GetMapping("/sendFIFOMsg")public void sendFIFOMsg() {List<User> users = new ArrayList<>();User user = new User();user.setId("1");user.setSex("男");user.setName("曹震");user.setAge(28);user.setDesc("创建订单");users.add(user);User user1 = new User();user1.setId("2");user1.setSex("男");user1.setName("贾耀旗");user1.setAge(25);user1.setDesc("创建订单");users.add(user1);User user2 = new User();user2.setId("1");user2.setSex("男");user2.setName("曹震");user2.setAge(28);user2.setDesc("订单付款");users.add(user2);User user3 = new User();user3.setId("1");user3.setSex("男");user3.setName("曹震");user3.setAge(28);user3.setDesc("订单完成");users.add(user3);User user4 = new User();user4.setId("1");user4.setSex("男");user4.setName("曹震");user4.setAge(28);user4.setDesc("订单推送");users.add(user4);User user5 = new User();user5.setId("2");user5.setSex("男");user5.setName("贾耀旗");user5.setAge(25);user5.setDesc("订单付款");users.add(user5);User user6 = new User();user6.setId("2");user6.setSex("男");user6.setName("贾耀旗");user6.setAge(25);user6.setDesc("订单完成");users.add(user6);mqProducerService.sendFIFOMsg(users);}}

1.8我们来启动服务,看下效果

我们以

@GetMapping("/sendFIFOMsg")
public void sendFIFOMsg() {} 这个方法进行测试。可以看出这里的代码其实是内容顺序是乱的,我们先看调用成功后的结果:

 

线程Thread[ConsumeMessageThread_3,5,main]内容为:{"id":"1","name":"曹震","age":28,"sex":"男","desc":"创建订单"}队列序号:1,消息msgId:A9FE29E30E3800DAD5DC7F03A485001C
2023-08-25 15:55:45.162  INFO 3640 --- [MessageThread_3] a.r.s.s.DefaultRocketMQListenerContainer : consume A9FE29E30E3800DAD5DC7F03A485001C cost: 1 ms
线程Thread[ConsumeMessageThread_4,5,main]内容为:{"id":"2","name":"贾耀旗","age":25,"sex":"男","desc":"创建订单"}队列序号:2,消息msgId:A9FE29E30E3800DAD5DC7F03A486001E
2023-08-25 15:55:45.164  INFO 3640 --- [MessageThread_4] a.r.s.s.DefaultRocketMQListenerContainer : consume A9FE29E30E3800DAD5DC7F03A486001E cost: 1 ms
线程Thread[ConsumeMessageThread_4,5,main]内容为:{"id":"2","name":"贾耀旗","age":25,"sex":"男","desc":"订单付款"}队列序号:2,消息msgId:A9FE29E30E3800DAD5DC7F03A4860026
2023-08-25 15:55:45.164  INFO 3640 --- [MessageThread_4] a.r.s.s.DefaultRocketMQListenerContainer : consume A9FE29E30E3800DAD5DC7F03A4860026 cost: 0 ms
线程Thread[ConsumeMessageThread_4,5,main]内容为:{"id":"2","name":"贾耀旗","age":25,"sex":"男","desc":"订单完成"}队列序号:2,消息msgId:A9FE29E30E3800DAD5DC7F03A4870028
2023-08-25 15:55:45.164  INFO 3640 --- [MessageThread_4] a.r.s.s.DefaultRocketMQListenerContainer : consume A9FE29E30E3800DAD5DC7F03A4870028 cost: 0 ms
线程Thread[ConsumeMessageThread_5,5,main]内容为:{"id":"1","name":"曹震","age":28,"sex":"男","desc":"订单付款"}队列序号:1,消息msgId:A9FE29E30E3800DAD5DC7F03A4860020
2023-08-25 15:55:45.164  INFO 3640 --- [MessageThread_5] a.r.s.s.DefaultRocketMQListenerContainer : consume A9FE29E30E3800DAD5DC7F03A4860020 cost: 0 ms
线程Thread[ConsumeMessageThread_5,5,main]内容为:{"id":"1","name":"曹震","age":28,"sex":"男","desc":"订单完成"}队列序号:1,消息msgId:A9FE29E30E3800DAD5DC7F03A4860022
2023-08-25 15:55:45.164  INFO 3640 --- [MessageThread_5] a.r.s.s.DefaultRocketMQListenerContainer : consume A9FE29E30E3800DAD5DC7F03A4860022 cost: 0 ms
线程Thread[ConsumeMessageThread_5,5,main]内容为:{"id":"1","name":"曹震","age":28,"sex":"男","desc":"订单推送"}队列序号:1,消息msgId:A9FE29E30E3800DAD5DC7F03A4860024
2023-08-25 15:55:45.164  INFO 3640 --- [MessageThread_5] a.r.s.s.DefaultRocketMQListenerContainer : consume A9FE29E30E3800DAD5DC7F03A4860024 cost: 0 ms

 我们可以看到已经进行了消费操作,大家有没有看到同一个id的用户他们消费队列信息是一样的

思考:我们在创建数据的时候,明明数据的顺序不是一致的,我们将消息发送到队列中,这个时候应该是按照FIFO的形式去消费才对,应该是乱的顺序消费才对。为什么这里会把同一个id的信息在一起消费呢?而且还是按照创建订单顺序去消费的?

对了,我们在使用mq的时候会出现两笔订单,处理订单流程顺序的问题,比如:订单1还没有处理完,订单2也发消息给mq了,这时候应该回去消费订单2,那么订单1怎么?这个过程中还可能造成脏数据问题。

那么我们就需要保证订单的顺序消费了,那么顺序消费怎么处理呢?可以看上面代码。我们看到生产者有将用户的id进行hash计算,然后得到值,这个值相同的数据放在同一队列中,这样是不是就保证了消息的顺序消费?

 

四 、思考

我们上面已经保证了数据的顺序消费,那么如何保证数据不丢失呢?如何保证数据重复消费问题?

大家可以思考下。后续我会继续在本文章中进行补充和代码实践。

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/90866.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

FireFox禁用HTTP2

问题 最近需要调试接口&#xff0c;但是&#xff0c;Chrome都是强制使用h2协议&#xff0c;即HTTP/2协议。为了排除h2协议排除对接口调用的影响&#xff0c;需要强制浏览器使用HTTP1协议。 解决 FireFox 设置firefox的network.http.http2.enabled为禁用&#xff0c;这样就禁…

文件属性查看和修改学习

这个是链接&#xff0c;相当于快捷方式&#xff0c;指向usr/bin这个目录&#xff0c;链接到这个目录

Apache的简单介绍(LAMP架构+搭建Discuz论坛)

文章目录 1.Apache概述1.1什么是apache1.2 apache的功能及特性1.2.1功能1.2.2特性 1.3 MPM 工作模式1.3.1 prefork模式1.3.2 worker模式1.3.3 event模式 2.LAMP概述2.1 LAMP的组成2.2 LAMP各组件的主要作用2.3 LAMP的工作过程2.4CGI和FastCGI 3.搭建Discuz论坛所需4.编译安装Ap…

Java面试之用两个栈实现队列

文章目录 题目一、什么是队列和栈&#xff1f;1.1队列1.2栈 二、具体实现2.1 思路分析2.2代码实现 题目 用两个栈实现一个队列&#xff0c;实现在队列尾部插入节点和在队列头部删除节点的功能。 一、什么是队列和栈&#xff1f; 1.1队列 队列是一种特殊的线性表&#xff0c;…

思腾云计算

思腾合力是一家定位于为高等院校、科研院所、企业提供思腾系列硬件加速计算服务器以及软件系统集成的公司。公司的主营业务&#xff1a;1.以思腾合力品牌的服务器硬件&#xff0c;从外形服务器从1U到4U到塔式。从算力服务器单个的计算卡到最多一台机器可以支持二十块运算卡。从…

【C++】vector的模拟实现

1、vector的模拟实现.h #pragma oncenamespace My_vector {template<class T>class vector{public:typedef T* iterator; //typedef受访问限定符限制&#xff0c;要放成公有typedef const T* const_iterator;iterator begin(){return _start;}iterator end(){return _fi…

SQL-DQL

-----分组查询----- 1.语法&#xff1a; SELECT 字段列表 FROM 表名 [WHERE 条件 ] GROUP BY 分组字段名 [HAVING 分组后过滤条件]&#xff1b; 2.where与having区别 》执行时机不同&#xff1a;where是分组之前进行过滤&#xff0c;不满足where条件&#xff0c;不参与分组&…

启迪未来:学乐多光屏P90引领儿童智能学习革命

在当今数字化时代&#xff0c;教育方式正经历着巨大的变革&#xff0c;智能硬件为教育领域带来了前所未有的机遇和挑战。学乐多光屏学习机作为一款创新的教育智能硬件产品&#xff0c;以其独特的特点和优势&#xff0c;引领着学习机领域的发展潮流。 1. 多功能融合&#xff1a;…

异或^实现数据加密

异或是一种二进制的位运算&#xff0c;符号以 XOR 或 ^ 表示。 1.1运算规则 相同为0&#xff0c;不同为1&#xff0c;即 1 ^ 1 0 0 ^ 0 0 1 ^ 0 1 由运算规则可知&#xff0c;任何二进制数与零异或&#xff0c;都会等于其本身&#xff0c;即 A ^ 0 A。 1.2 异或性质 …

自动化管理管理工具----Ansible

目录 ​编辑 一、Ansible概念 1.1特点 二、工作机制&#xff08;日常模块&#xff09; 2.1 核心程序 三、Ansible 环境安装部署 四、ansible 命令行模块 4.1command 模块 4.2shell 模块 4.3cron 模块 4.4user 模块 4.5group 模块 4.6copy模块 4.7file模块 4.8ho…

静态路由配置实验(超详细讲解+详细命令行)

系列文章目录 华为数通学习&#xff08;7&#xff09; 前言 一&#xff0c;静态路由配置 二&#xff0c;网络地址配置 AR1的配置&#xff1a; AR2的配置&#xff1a; AR3的配置&#xff1a; 三&#xff0c;测试是否连通 AR1的配置: 讲解&#xff1a; AR2的配置&#…

Postman的高级用法—Runner的使用​

1.首先在postman新建要批量运行的接口文件夹&#xff0c;新建一个接口&#xff0c;并设置好全局变量。 2.然后在Test里面设置好要断言的方法 如&#xff1a; tests["Status code is 200"] responseCode.code 200; tests["Response time is less than 10000…