SpringBoot整合rabbitmq使用案例

RocketMQ(二十四)整合SpringBoot

SpringBoot整合rabbitmq使用案例

    • 一 SpringBoot整合RocketMQ实现消息发送和接收
      • 消息生产者
        • 1)添加依赖
        • 2)配置文件
        • 3)启动类
        • 4)测试类
      • 消息消费者
        • 1)添加依赖
        • 2)配置文件
        • 3)启动类
        • 4)测试类
        • 5)RocketMQMessageListener参数
      • 测试
    • RocketMQ发送同步消息
      • 同步消API介绍
    • RocketMQ发送异步消息
      • 异步消息API介绍
    • RocketMQ发送单向消息
    • RocketMQ消费者广播模式和负载均衡模式
    • RocketMQ实现顺序消息
    • RocketMQ实现延迟消息

一 SpringBoot整合RocketMQ实现消息发送和接收

消息生产者

1)添加依赖
        <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId></dependency>
2)配置文件
server:port: 8081servlet:context-path: /rocketmq:name-server: 127.0.0.1:9876producer:group: producer-demo1
3)启动类
@SpringBootApplication
public class RocketmqProducerApplication {public static void main(String[] args) {ConfigurableApplicationContext run = SpringApplication.run(RocketmqProducerApplication.class, args);}
}
4)测试类
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
@SpringBootTest(classes = {RocketmqProducerApplication.class})
public class ProducerTest {@Autowiredprivate RocketMQTemplate rocketMQTemplate;@Testpublic void ConvertAndSendTest() {rocketMQTemplate.convertAndSend("springboot-mq", "hello springboot rocketmq");}}

消息消费者

1)添加依赖
        <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId></dependency>
2)配置文件
server:port: 8084servlet:context-path: /rocketmq:name-server: 127.0.0.1:9876consumer:group: consumer-demo1
3)启动类
@SpringBootApplication
public class RocketmqConsumerApplication {public static void main(String[] args) {SpringApplication.run(RocketmqConsumerApplication.class, args);}
}
4)测试类
@RocketMQMessageListener(topic = "springboot-mq",consumerGroup = "${rocketmq.consumer.group}")
@Component
public class ConsumerService implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("收到消息内容:"+message);}
}
5)RocketMQMessageListener参数
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RocketMQMessageListener {String NAME_SERVER_PLACEHOLDER = "${rocketmq.name-server:}";String ACCESS_KEY_PLACEHOLDER = "${rocketmq.consumer.access-key:}";String SECRET_KEY_PLACEHOLDER = "${rocketmq.consumer.secret-key:}";String TRACE_TOPIC_PLACEHOLDER = "${rocketmq.consumer.customized-trace-topic:}";String ACCESS_CHANNEL_PLACEHOLDER = "${rocketmq.access-channel:}";/*** 消费者分组** @return*/String consumerGroup();/*** 主题*/String topic();/*** selectorType:消息选择器类型* - SelectorType.TAG:默认值,根据TAG选择,仅支持表达式格式如:“tag1 || tag2 || tag3”,如果表达式为null或者“*”标识订阅所有消息* - SelectorType.SQL92:根据SQL92表达式选择*/SelectorType selectorType() default SelectorType.TAG;/*** selectorType 对应的表达式*/String selectorExpression() default "*";/*** consumeMode:消费模式* - ConsumeMode.CONCURRENTLY:默认值,并行处理* - ConsumeMode.ORDERLY:按顺序处理*/ConsumeMode consumeMode() default ConsumeMode.CONCURRENTLY;/*** messageMode:消息模型* - MessageModel.CLUSTERING:默认值,集群* - MessageModel.BROADCASTING:广播*/MessageModel messageModel() default MessageModel.CLUSTERING;/*** 最大线程数,默认值 64*/int consumeThreadMax() default 64;/*** 消费失败,最大重试次数* <p>* - 在并发模式中,-1表示16* - 在有序模式中,-1表示整数最大值*/int maxReconsumeTimes() default -1;/*** 消息可能阻止使用线程的最长时间(分钟)*/long consumeTimeout() default 15L;/*** 发送回复消息超时*/int replyTimeout() default 3000;/*** 默认值 ${rocketmq.consumer.access-key:}*/String accessKey() default ACCESS_KEY_PLACEHOLDER;/*** 默认值 ${rocketmq.consumer.secret-key:}*/String secretKey() default SECRET_KEY_PLACEHOLDER;/*** 启用消息轨迹,默认值 false*/boolean enableMsgTrace() default false;/*** 自定义的消息轨迹主题,默认值${rocketmq.consumer.customized-trace-topic:}* 没有配置此配置项则使用默认的主题*/String customizedTraceTopic() default TRACE_TOPIC_PLACEHOLDER;/*** 命名服务器地址,默认值${rocketmq.name-server:}*/String nameServer() default NAME_SERVER_PLACEHOLDER;/*** 默认值${rocketmq.access-channel:}*/String accessChannel() default ACCESS_CHANNEL_PLACEHOLDER;
}

测试

运行生产者中ConvertAndSendTest测试方式,观察消费者监听器日志

RocketMQ发送同步消息

发送同步消息是指producer向 broker 发送消息,执行 API 时同步等待, 直到broker 服务器返回发送结
果;
相对异步发送消息,同步会阻塞线程,性能相对差点,但是可靠性高,这种方式得到广泛使用,比如:
短信通知,邮件通知,站内重要信息通知等。
RocketMQTemplate 给我们提供了syncSend方法(有多个重载),来实现发送同步消息;
下面给一个实例:

    /*** 发送同步消息*/@Testpublic void sendSyncMessageTest() {for (int i = 0; i < 10; i++) {SendResult sendResult = rocketMQTemplate.syncSend("springboot-mq", "rocketmq同步消息!" + i);System.out.println(sendResult);}}

这里执行完发送同步消息返回执行结果 SendResult ;

同步消API介绍

//发送普通同步消息-Object
syncSend(String destination, Object payload)
//发送普通同步消息-Message
syncSend(String destination, Message<?> message)
//发送批量普通同步消息
syncSend(String destination, Collection<T> messages)
//发送普通同步消息-Object,并设置发送超时时间
syncSend(String destination, Object payload, long timeout)
//发送普通同步消息-Message,并设置发送超时时间
syncSend(String destination, Message<?> message, long timeout)
//发送批量普通同步消息,并设置发送超时时间
syncSend(String destination, Collection<T> messages, long timeout)
//发送普通同步延迟消息,并设置超时
syncSend(String destination, Message<?> message, long timeout, int delayLevel)
/*** 批量消息*/
@Test
void asyncSendBatch() {Message<String> msg = MessageBuilder.withPayload("hello world test1").build();List<Message> msgList = Arrays.asList(msg,msg,msg,msg,msg);SendResult res = rocketMQTemplate.syncSend("first-topic-str:tag1", msgList);log.info("批量消息");
}

RocketMQ发送异步消息

发送异步消息是指producer向 broker 发送消息时指定消息发送成功及发送异常的回调方法,调用 API
后立即返回,producer发送消息线程不阻塞 ,消息发送成功或失败的回调任务在一个新的线程中执行

相对发送同步消息,异步消息性能更高,可靠性略差。适合对响应时间要求高的业务场景。
RocketMQTemplate 给我们提供了asyncSend方法(有多个重载),来实现发送异步消息;

    /*** 异步消息-String* 指发送方发出数据后,不等接收方发回响应,接着发送下个数据包* 关键实现异步发送回调接口(SendCallback)* 在执行消息的异步发送时应用不需要等待服务器响应即可直接返回,通过回调接口接收务器响应,并对服务器的响应结果进行处理* 这种方式任然需要返回发送消息任务的执行结果,异步不影响后续任务,不会造成阻塞*/@Testpublic void sendAsyncMessage() {for (int i = 0; i < 10; i++) {rocketMQTemplate.asyncSend("springboot-mq", "rocketmg异步消息!" + i,new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("发送成功!");}@Overridepublic void onException(Throwable throwable) {System.out.println("发送失败!");}});}}

类似发送同步消息,多了一个SendCallback回调接口参数,实现onSuccess和onException方法,分别
表示异步发送成功和失败;
在这里插入图片描述

异步消息API介绍

//发送普通异步消息-Object
asyncSend(String destination, Object payload, SendCallback sendCallback)
//发送普通异步消息-Message
asyncSend(String destination, Message<?> message, SendCallback sendCallback)
//发送普通异步消息-Object,并设置发送超时时间
asyncSend(String destination, Object payload, SendCallback sendCallback, long timeout)
//发送普通异步消息-Message,并设置发送超时时间
asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout)
//发送普通异步延迟消息,并设置超时
asyncSend(String destination, Message<?> message, SendCallback sendCallback, long timeout, int delayLevel)

RocketMQ发送单向消息

发送单向消息是 指producer向 broker 发送消息,执行 API 时直接返回,不等待broker 服务器的结果
这种方式主要用在不特别关心发送结果的场景,举例:日志发送
RocketMQTemplate 给我们提供了sendOneWay方法(有多个重载),来实现发送单向消息;
下面给一个实例:

  /*** 单向消息* 特点为只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答* 此方式发送消息的过程耗时非常短,一般在微秒级别* 应用场景:适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集*/@Testpublic void sendOneWayMessage() {for (int i = 0; i < 10; i++) {rocketMQTemplate.sendOneWay("springboot-mq", "rocketmg单向消息!" + i);}}

RocketMQ消费者广播模式和负载均衡模式

在这里插入图片描述
如上图,假如我们有多个消费者,消息生产者发送的消息,是每一个消费者都消费一次呢?还是通过一
些机制,比如轮询机制,每个消息只被某一个消费者消费一次呢?
这里涉及到消费者的消费模式,一种是广播模式,还有一种是负载均衡模式;

  • 广播模式是每个消费者,都会消费消息;
  • 负载均衡模式是每一个消息只会被某一个消费者消费一次;

我们业务上一般用的是负载均衡模式,当然一些特殊场景需要用到广播模式,比如发送一个信息到邮
箱,手机,站内提示;
我们可以通过 @RocketMQMessageListenermessageModel 属性值来设置,
MessageModel.BROADCASTING 是广播模式MessageModel.CLUSTERING 是默认集群负载均衡模式
我们先集群负载均衡测试,加上 messageModel=MessageModel.CLUSTERING


/*** @ClassName: ConsumerService* @Description: 消息消费者* @Author wxl* @Date 2024-04-22* @Version 1.0.0**/
@RocketMQMessageListener(topic = "springboot-mq",consumerGroup = "${rocketmq.consumer.group}",messageModel = MessageModel.CLUSTERING)
@Component
public class ConsumerService implements RocketMQListener<String> {@Overridepublic void onMessage(String message) {System.out.println("收到消息内容:"+message);}
}

RocketMQ实现顺序消息

rocketmq默认发送的消息是进入多个消息队列,然后消费端多线程并发消费,所以默认情况,不是顺序消费消息的;
在这里插入图片描述
有时候,我们需要实现顺序消费一批消息,比如电商系统,订单创建,支付,完成等操作,需要顺序执行;
RocketMQTemplate 给我们提供了SendOrderly方法(有多个重载),来实现发送顺序消息;包括以下:
syncSendOrderly,发送同步顺序消息;
asyncSendOrderly,发送异步顺序消息;
sendOneWayOrderly,发送单向顺序消息;
一般我们用第一种发送同步顺序消息;
在这里插入图片描述
第三个参数hashKe,方法点进去:
在这里插入图片描述
因为broker会管理多个消息队列,这个hashKey参数,主要用来计算选择队列的,一般可以把订单ID,
产品ID作为参数值;
发送到一个队列,这样方便搞顺序队列;
以及消费端接收的时候,默认是并发多线程去接收消息。RocketMQMessageListener有个属性
consumeMode,默认是ConsumeMode.CONCURRENTLY ,我们要改成ConsumeMode.ORDERLY,
单线程顺序接收消息;
下面给具体事例,模拟两个订单发送消息;

消息生产者端:

    /*** 发送同步顺序消息*/public void sendOrderlyMessage() {// hashKey用来计算决定消息发送到哪个消息队列    一般是订单ID,产品ID等rocketMQTemplate.syncSendOrderly("java1234-rocketmq-orderly", "98456231,创建", "98456231");rocketMQTemplate.syncSendOrderly("java1234-rocketmq-orderly", "98456231,支付", "98456231");rocketMQTemplate.syncSendOrderly("java1234-rocketmq-orderly", "98456231,完成", "98456231");rocketMQTemplate.syncSendOrderly("java1234-rocketmq-orderly", "98456232,创建", "98456232");rocketMQTemplate.syncSendOrderly("java1234-rocketmq-orderly", "98456232,支付", "98456232");rocketMQTemplate.syncSendOrderly("java1234-rocketmq-orderly", "98456232,完成", "98456232");}

消费者端:

@RocketMQMessageListener(topic = "java1234-rocketmq-orderly",consumerGroup = "${rocketmq.consumer.group}", consumeMode = ConsumeMode.ORDERLY)
@Component
public class ConsumerService implements RocketMQListener<String> {@Overridepublic void onMessage(String s) {System.out.println("消费者:收到消息内容:" + s);}
}

运行测试:

在这里插入图片描述

RocketMQ实现延迟消息

延迟消息
对于消息中间件来说,producer 将消息发送到mq的服务器上,但并不希望这条消息马上被消费,
而是推迟到当前时间节点之后的某个时间点,再将消息投递到 queue 中让 consumer 进行消费。

延迟消息的使用场景很多,一种比较常见的场景就是在电商系统中,订单创建后,会有一个等待用
户支付的时间窗口,一般为30分钟,30分钟后 customer 会收到这条订单消息,然后程序去订单表中检
查当前这条订单的支付状态,如果是未支付的状态,则自动清理掉,这样就不需要使用定时任务的方式
去处理了。

Rocket的延迟消息
RocketMQ 支持定时的延迟消息,但是不支持任意时间精度,仅支持特定的 level,例如定时 5s,
10s, 1m 等。其中,level=0 级表示不延时,level=1 表示 1 级延时,level=2 表示 2 级延时,以此类
推。

messageDelayLevel=1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

在这里插入图片描述
我们会发现,所有消息发送方法都有一个带int类型的delayLevel参数重载方法,这个就是设置延迟消息
级别的参数。
同时注意,每个带delayLevel参数的方法,也同时带有long类型的timeout参数,这个是设置消息发送超
时时间,默认是3秒,我们也可以自行设置;
同时还有 Message参数,如果发送这种类型的消息,可以携带具体的消息数据;

    /*** 发送延迟消息*/@Testpublic void sendDelayMessage() {rocketMQTemplate.syncSend("java1234-rocketmq", MessageBuilder.withPayload("rocketmq延迟1秒消息").build(), 3000, 1);rocketMQTemplate.syncSend("java1234-rocketmq", MessageBuilder.withPayload("rocketmq延迟5秒消息").build(), 3000, 2);rocketMQTemplate.syncSend("java1234-rocketmq", MessageBuilder.withPayload("rocketmq延迟10秒消息").build(), 3000, 3);}

运行测试:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/670723.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

软件测试,软件评测师

如果你想考软件评测师证书&#xff0c;那这篇文章可以帮你少走很多弯路&#xff0c;估计你用别人一半的时间备考就可以通过考试&#xff0c;以下为本人亲身经验哈&#xff0c;你可以先收藏后看哦&#xff0c;提前祝你考试过过过。 如果以后想从事一份软件测试工程师的工作&…

【算法系列】链表

目录 常用技巧 常用操作 leetcode/牛客题目 一、移除链表元素 二、反转链表 三、链表的中间结点 四、返回倒数第k个节点 五、合并两个有序链表 六、链表分割 七、链表的回文结构 八、相交链表 九、环形链表 十、环形链表 II 十一、随机链表的复制 十二、两数相加…

淡茶怎么泡?

很多人都知道喝浓茶对身体不好&#xff0c;但是怎么冲泡淡茶却一知半解。按照《品深淡茶冲泡标准》中对绿茶冲泡淡茶的规定&#xff0c;冲泡的茶汤中咖啡碱不得高于31.67mg/100mL&#xff0c;可可碱不得高于2.67mg/mL&#xff0c;茶碱不得高于1.50mg/100mL&#xff0c;茶多酚不…

使用nginx部署Vue项目

前提是后端已经跨域&#xff01; 下载nginx&#xff0c;在路径下使用cmd打开nginx&#xff0c;关闭nginx使用任务管理器details end task 把dist中的文件都放到html文件夹中 打开conf&#xff0c;找到nginx.conf&#xff0c;编辑以下内容 location就是刚才放dist文件的那个文…

ArthasGC日志GCeasy详解

Arthas详解 Arthas是阿里巴巴在2018年9月开源的Java诊断工具,支持JDK6,采用命令行交互模式,可以方便定位和诊断线上程序运行问题.Arthas官方文档十分详细.详见:官方文档 Arthas使用场景 Arthas使用 # github下载arthas wget https://alibaba.github.io/arthas/arthas-boot.j…

Go 语言 ORM 框架之 xorm

1、xorm 1.1、xorm 简介 xorm 是一个简单而强大的Go语言ORM库. 通过它可以使数据库操作非常简便。 特性 支持 struct 和数据库表之间的灵活映射&#xff0c;并支持自动同步事务支持同时支持原始SQL语句和ORM操作的混合执行使用连写来简化调用支持使用ID, In, Where, Limit,…

太速科技-FMC377_双AD9361 射频收发模块

FMC377_双AD9361 射频收发模块 FEATURES&#xff1a; ◆ Coverage from 70M ~ 6GHz RF ◆ Flexible rate 12 bit ADC/DAC ◆ Fully-coherent 4x4 MIMO capability, TDD/FDD ◆ RF ports: 50Ω Matched ◆ support both internal reference and exter…

怎么通过Java语言实现远程控制无人售货柜

怎么通过Java语言实现远程控制无人售货柜呢&#xff1f; 本文描述了使用Java语言调用HTTP接口&#xff0c;实现控制无人售货柜&#xff0c;独立控制售货柜、格子柜的柜门。 可选用产品&#xff1a;可根据实际场景需求&#xff0c;选择对应的规格 序号设备名称厂商1智能WiFi控…

JavaScript注释规范

你好&#xff0c;我是云桃桃。 一个希望帮助更多朋友快速入门 WEB 前端的程序媛。 云桃桃 &#xff0c;大专生&#xff0c;一枚程序媛&#xff0c;感谢关注。回复 “前端基础题”&#xff0c;可免费获得前端基础 100 题汇总&#xff0c;回复 “前端基础路线”&#xff0c;可获…

JSON原生AJAX

文章目录 JSONFastjsonfastjson引入fastjson 常用APIfastjson作用常用API使用实例 ajax和json综合(重要)请求参数和响应数据都是普通字符串响应数据改为json格式请求和响应都是js数据封装到Result类和抽取到BaseController 原生AjaxAJAX的执行流程XMLHttpRequest对象使用原生的…

寻找最佳App分发平台:小猪APP分发脱颖而出

在当今移动应用市场日益饱和的环境下&#xff0c;选择一个合适的App分发平台对于开发者来说至关重要。这不仅关系到应用能否快速触达目标用户&#xff0c;还直接影响到品牌的塑造与市场份额的争夺。本文将深入探讨几大关键因素&#xff0c;帮助开发者判断哪个App分发平台最适合…

07 - 步骤 javaScript代码

简介 JavaScript 代码是通过 JavaScript 脚本步骤来执行 JavaScript 脚本的一种方式。这允许用户在 Kettle 的数据流程中使用 JavaScript 编写自定义的脚本逻辑&#xff0c;用于数据处理、转换、计算等操作。 使用 场景 我需要在数据流加一个字段 createTime 当前时间&…