Springboot整合RocketMQ 基本消息处理

目录

1. 同步消息

2. 异步消息

3. 单向消息

4. 延迟消息

5. 批量消息

6. 顺序消息

 7. Tag过滤


导入依赖

       <dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId></dependency>

YAML配置

rocketmq:name-server: localhost:9876     # rocketMq的nameServer地址

1. 同步消息

同步消息是发送消息后等待Broker的响应,确保消息被成功接收。

生产者:

   @AutowiredRocketMQTemplate rocketMQTemplate;@Testvoid contextLoads() {SendResult result = rocketMQTemplate.syncSend("test", MessageBuilder.withPayload("同步消息").build());
//        SendResult result = rocketMQTemplate.syncSend("test", "同步消息");System.out.println("发送状态:" + result.getSendStatus() + " 消息id:" + result.getMsgId());}

2. 异步消息

异步消息是发送消息后不等待Broker响应,通过回调函数处理发送结果。

@AutowiredRocketMQTemplate rocketMQTemplate;@Testvoid contextLoads() {rocketMQTemplate.asyncSend("test", MessageBuilder.withPayload("异步消息").build(), new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("发送成功"+sendResult);}@Overridepublic void onException(Throwable throwable) {System.out.println("发送失败"+throwable);}});}

3. 单向消息

单向消息是发送消息后不等待Broker响应,也没有回调函数。

    @AutowiredRocketMQTemplate rocketMQTemplate;@Testvoid contextLoads() {rocketMQTemplate.sendOneWay("test","单向消息");}

4. 延迟消息

延迟消息是设置消息的延迟时间,确保消息在指定时间后才被消费。

 @AutowiredRocketMQTemplate rocketMQTemplate;@Testvoid contextLoads() {//在RocketMQ中,timeout(超时时间)是指消息发送的最大等待时间。当你发送一个消息时,系统会等待一定的时间来获取发送结果,这个等待的时间就是超时时间。单位msMessage<String> message = MessageBuilder.withPayload("延迟消息").build();//延迟级别 "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h" 2对应5sSendResult result = rocketMQTemplate.syncSend("test", message, 2000, 2);}

5. 批量消息

批量消息是将多个消息打包成一个消息批次发送,提高发送效率。

    @AutowiredRocketMQTemplate rocketMQTemplate;@Testvoid contextLoads() {List<String> list = Arrays.asList("blue", "red", "pink", "yello");rocketMQTemplate.syncSend("test",list);}

上面所有生产者对应的消费者代码为:

@Component
@RocketMQMessageListener(topic = "test",consumerGroup = "test-group-consumer")
public class MQMsgListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt message) {String msgId = message.getMsgId();String msg = new String(message.getBody());System.out.println("消息id:"+msgId+"消息内容:"+msg);}
}

6. 顺序消息

顺序消息是保证同一个消息队列中的消息按顺序消费。

生产者代码:

    @AutowiredRocketMQTemplate rocketMQTemplate;@Testvoid contextLoads() {for(int i=0;i<10;i++){rocketMQTemplate.syncSendOrderly("test","顺序消息"+i,"1");}}

消费者代码更改:

@Component
@RocketMQMessageListener(topic = "test",consumerGroup = "test-group-consumer",consumeMode = ConsumeMode.ORDERLY)
public class MQMsgListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt message) {String msgId = message.getMsgId();String msg = new String(message.getBody());System.out.println("消息id:"+msgId+"消息内容:"+msg);}
}

 7. Tag过滤

消费者订阅的Tag和发送者设置的消息Tag相互匹配,则消息被投递给消费端进行消费。

生产者

    @AutowiredRocketMQTemplate rocketMQTemplate;@Testvoid contextLoads() {rocketMQTemplate.syncSend("test:test","hello");}

消费者

@Component
@RocketMQMessageListener(topic = "test",consumerGroup = "test-group-consumer",selectorType = SelectorType.TAG,selectorExpression = "test")
public class MQMsgListener implements RocketMQListener<MessageExt> {@Overridepublic void onMessage(MessageExt message) {String msgId = message.getMsgId();String msg = new String(message.getBody());System.out.println("消息id:"+msgId+"消息内容:"+msg);}
}

 @RocketMQMessageListener 注解参数如下:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/322784.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

GraphQL和REST API的区别

我的新书《Android App开发入门与实战》已于2020年8月由人民邮电出版社出版&#xff0c;欢迎购买。点击进入详情 GraphQL&#xff08;Graph Query Language&#xff09;和REST&#xff08;Representational State Transfer&#xff09;是两种用于构建和设计API的不同方法。以下…

UseContentHash选项能否在打包AssetBundle时计算可靠的Hash

1&#xff09;UseContentHash选项能否在打包AssetBundle时计算可靠的Hash 2&#xff09;如何清理Native Reserved部分的内存 3&#xff09;Addressables资源完整性校验 4&#xff09;通过Image.color和CanvasRenderer.SetColor修改UI组件颜色的区别 这是第368篇UWA技术知识分享…

如何利用PLC网关实现PLC远程调试?

在工业自动化领域&#xff0c;PLC&#xff08;可编程逻辑控制器&#xff09;是核心组成部分。但传统PLC调试方式往往需要工程师亲临现场&#xff0c;这不仅耗时&#xff0c;还增加了成本。好消息是&#xff0c;借助PLC网关&#xff0c;我们可以实现PLC的远程调试&#xff01;今…

Linux编辑器vim的基本操作(详解及GIF演示)

&#x1f4ab;Linux开发工具vim 在我们初学某门语言时可能接触过使用记事本编辑代码&#xff0c;在之后我们开始接触visual studio等集成开发环境&#xff0c;对于这种基于图形化界面的编辑工具我们可以说已经十分熟悉了&#xff0c;那么接下来我们就来介绍一下Linux中的编辑器…

Prometheus实战篇:Prometheus监控redis

准备环境 docker-compose安装redis docker-compose.yaml version: 3 services:redis:image:redis:5container_name: rediscommand: redis-server --requirepass 123456 --maxmemory 512mbrestart: alwaysvolumes:- /data/redis/data: /dataport:- "6379:6379"dock…

有没有游泳可以戴的耳机?游泳耳机入耳式好,还是骨传导好

游泳是一项既能锻炼身体又能让人放松心情的运动。我们知道&#xff0c;音乐能够为我们的水上时光增添更多的乐趣。那么&#xff0c;在众多游泳耳机中&#xff0c;如何选择一款既适合自己的需求又具备良好性能的产品呢&#xff1f; 首先&#xff0c;我们要了解的是&#xff0c;…

基于ssm校园线上订餐系统的设计与实现论文

摘 要 信息数据从传统到当代&#xff0c;是一直在变革当中&#xff0c;突如其来的互联网让传统的信息管理看到了革命性的曙光&#xff0c;因为传统信息管理从时效性&#xff0c;还是安全性&#xff0c;还是可操作性等各个方面来讲&#xff0c;遇到了互联网时代才发现能补上自古…

docker (portainer 安装nginx)

汉化版步骤可以参考&#xff1a;写文章-CSDN创作中心https://mp.csdn.net/mp_blog/creation/editor/135258056 一、创建容器 二、配置端口&#xff0c;以及容器卷挂载 挂载目录配置&#xff1a;(下方截图的目录如下&#xff0c;docker 改为 mydocker&#xff0c;用docker作为根…

jenkins忘记密码后的操作

1、先停止 jenkins 服务 systemctl stop jenkins 关闭Jenkins服务 或者杀掉进程 ps -ef | grep jenkins &#xff5c;awk {print $2} | grep -v "grep" | xargs kill -9 2、找到 config.xml 文件 find /root -name config.xml3、备份config.xml文件 cp /root/.jen…

【SpringBoot】-Spring MVC项目如何传递参数?

作者&#xff1a;学Java的冬瓜 博客主页&#xff1a;☀冬瓜的主页&#x1f319; 专栏&#xff1a;【Framework】 主要内容&#xff1a;使用SpringBoot的SpringMVC框架传递各种参数&#xff0c;如传对象&#xff0c;传表单&#xff0c;传文件。后端对前端的请求信息的获取&#…

shell sshpass 主机交互 在另外一台主机上执行某个命令 批量管理主机 以及一些案例

目录 作用安装 sshpasssshpass 用法在远程主机执行某个命令 案例批量传输密匙批量拷贝文件批量修改密码 作用 就是用一台主机 控制另外一台主机免交互任务管理工具方便批量管理主机使用方法就是在ssh 前边加一个 sshpass 安装 sshpass # 安装 sshpass yum -y install sshpas…

【MySQL】数据库之MHA高可用

目录 一、MHA 1、什么是MHA 2、MHA 的组成 3、MHA的特点 4、MHA的工作原理 二、有哪些数据库集群高可用方案 三、实操&#xff1a;一主两从部署MHA 1、完成主从复制 步骤一&#xff1a;完成所有MySQL的配置文件修改 步骤二&#xff1a;完成所有MySQL的主从授权&#x…