SpringBoot集成Apache RocketMQ详解

文章目录

  • 0. 前言
  • 1. Spring Boot 集成Apache RocketMQ详细步骤
    • 1.1.添加依赖
    • 1.2.配置RocketMQ
    • 1.3.创建消息生产者(Producer)
    • 1.4.创建消息消费者(Consumer)
  • 2. 测试验证
  • 3. 常见报错
  • 4. 参考文档
  • 5. 源码地址

在这里插入图片描述

0. 前言

上个章节我们学习了RocketMQ的学习环境安装,讲了两种安装方式 1. docker使用官方镜像安装,2.使用源码方式安装。安装教程如下
如果已经安装了RocketMQ 学习环境可以略过此章节《【实践篇(一)】RocketMQ入门之学习环境搭建》
本章节,我们学习Spring Boot 集成Apache RocketMQ。并验证 在SpringBoot应用中展示如何使用Apache RocketMQ的生产者(Producer)进行消息发送。
这段代码实现了以下类型的消息发送:
使用Apache RocketMQ 官方的依赖库 RocketMQTemplate,实现同步、异步等消息。

  1. 同步消息:使用syncSend方法,生产者会等待消息服务器回复确认后才会继续发送下一条消息。

  2. 异步消息:使用asyncSend方法,生产者发送消息后不等待服务器回复,直接发送下一条消息。

  3. 单向消息:使用sendOneWay方法,生产者只负责发送消息,不等待服务器回复,也不关注发送结果。

  4. 顺序消息:使用sendOrderly方法,按照消息的发送顺序进行消费(First-In-First-Out)。

  5. 延迟消息:使用sendDelayed方法,消息被发送后,不会立即被消费,等待特定的延迟时间后,才能被消费。

  6. 批量消息:使用sendBatch方法,一次发送多条消息,可以有效提高发送的吞吐量。

关于RocketMQ消息的消息模型介绍和使用,我专门写了一篇博客,搭建可以了解
《RocketMQ 消息传递模型》https://blog.csdn.net/wangshuai6707/article/details/132863088

1. Spring Boot 集成Apache RocketMQ详细步骤

1.1.添加依赖

在SpringBoot项目的pom.xml文件中添加RocketMQ的依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.15</version><relativePath/></parent><groupId>com.icepip.project</groupId><artifactId>springboot-icepip-rocketMQ-example</artifactId><version>0.0.1-SNAPSHOT</version><name>springboot-icepip-rocketMQ-example</name><description>Spring boot 集成rocketMQ 示例</description><properties><java.version>8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.apache.rocketmq</groupId><artifactId>rocketmq-spring-boot-starter</artifactId><version>2.0.4</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-actuator</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-thymeleaf</artifactId></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build></project>

1.2.配置RocketMQ

application.properties文件中配置RocketMQ的相关信息:

rocketmq.name-server=你的RocketMQ服务IP:9876
rocketmq.producer.group=my-producer
# 刚开始未配置 导致超时报错
rocketmq.producer.sendMessageTimeout=10000

1.3.创建消息生产者(Producer)

package com.icepip.project.mqtt.controller;import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import java.util.ArrayList;
import java.util.List;
/***  SpringBoot集成Apache RocketMQ详解* @author 冰点* @version 1.0.0* @date 2023/9/9 17:02*/@RestController
@RequestMapping("/producer")
public class ProducerController {@Autowiredprivate RocketMQTemplate rocketMQTemplate;/*** 同步发送消息到指定主题* @param message* @return*/@GetMapping("/syncSend")public String syncSend(String message) {// 同步发送消息到指定主题rocketMQTemplate.syncSend("test-topic", message);return "Sync message: " + message + " sent";}/*** 异步发送消息到指定主题* @param message* @return*/@GetMapping("/asyncSend")public String asyncSend(String message) {// 异步发送消息到指定主题rocketMQTemplate.asyncSend("test-topic", MessageBuilder.withPayload(message).build(), new SendCallback() {@Overridepublic void onSuccess(SendResult sendResult) {System.out.println("Async message sent successfully, result: " + sendResult);}@Overridepublic void onException(Throwable throwable) {System.err.println("Failed to send async message: " + throwable.getMessage());}}, 3000, 3); // 3000 ms timeout, delay level 3return "Async message: " + message + " sent";}/*** 发送单向消息到指定主题,无需等待Broker的确认* @param message* @return*/@GetMapping("/sendOneWay")public String sendOneWay(String message) {// 发送单向消息到指定主题,无需等待Broker的确认rocketMQTemplate.sendOneWay("test-topic", message);return "OneWay message: " + message + " sent";}// 发送顺序消息@GetMapping("/sendOrderly")public String sendOrderly(String message) {// 发送顺序消息到指定主题rocketMQTemplate.syncSendOrderly("test-topic", message, "order");return "Orderly message: " + message + " sent";}// 发送延迟消息@GetMapping("/sendDelayed")public String sendDelayed(String message) {// 发送延迟消息到指定主题,延迟级别为3rocketMQTemplate.syncSend("test-topic", MessageBuilder.withPayload(message).build(), 1000, 3);return "Delayed message: " + message + " sent";}// 发送批量消息@GetMapping("/sendBatch")public String sendBatch() {List<String> messages = new ArrayList<>();messages.add("message1");messages.add("message2");// 批量发送消息到指定主题rocketMQTemplate.syncSend("test-topic", messages);return "Batch messages sent";}
}

1.4.创建消息消费者(Consumer)

package com.icepip.project.mqtt.handler;import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Service;/*** 定义一个消费者,监听test-topic主题的消息* @author 冰点* @version 1.0.0* @date 2023/9/9 16:29*/@Service
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "my-consumer_test-topic")
public class MyConsumer implements RocketMQListener<String>{// 当收到消息时,该方法将被调用@Overridepublic void onMessage(String message) {System.out.println("Received message: "+ message);}
}

2. 测试验证

在这里插入图片描述
在这里插入图片描述

3. 常见报错

  1. See http://rocketmq.apache.org/docs/faq/ for further details.; nested exception is org.apache.rocketmq.client.exception.MQClientException: Send [3] times, still failed, cost [6386]ms, Topic: test-topic, BrokersSent: [698f11314447, 698f11314447, 698f11314447]
    See http://rocketmq.apache.org/docs/faq/ for further details.] with root cause

org.apache.rocketmq.remoting.exception.RemotingConnectException: connect to <172.17.0.8:10911> failed
解决办法,修改Broker的IP为宿主机IP
进容器修改配置文件,修改完启动服务 。启动之前先kill 掉容器里原来的Broker。
nohup sh mqbroker -c /home/rocketmq/rocketmq-4.9.2/broker.conf &
在这里插入图片描述

4. 参考文档

  1. 官方文档链接:https://rocketmq.apache.org/docs/

  2. GitHub链接:https://github.com/apache/rocketmq-spring

5. 源码地址

我的github https://github.com/wangshuai67/icepip-springboot-action-examples

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/108587.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

uniapp 轮播列表左右滑动,滑动到中间放大

html <!-- 轮播 --><view class"heade"><swiper class"swiper" display-multiple-items3 circulartrue previous-margin1rpxnext-margin1rpx current0 change"swiperChange" ><block v-for"(item,index) in list"…

计算机竞赛 机器视觉opencv答题卡识别系统

0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 &#x1f6a9; 答题卡识别系统 - opencv python 图像识别 该项目较为新颖&#xff0c;适合作为竞赛课题方向&#xff0c;学长非常推荐&#xff01; &#x1f947;学长这里给一个题目综合评分(每项满分5分…

【List篇】ArrayList 的线程不安全介绍

ArrayList 为什么线程不安全&#xff1f; 主要原因是ArrayList是非同步的,没有同步机制,并且其底层实现是基于数组&#xff0c;而数组的长度是固定的。当对 ArrayList 进行增删操作时&#xff0c;需要改变数组的长度&#xff0c;这就会导致多个线程可能同时操作同一个数组&…

离散制造企业如何打造MES管理系统

在当今制造业中&#xff0c;MES生产管理系统越来越受到关注&#xff0c;但在实际应用中也遇到了一些问题。本文分析了离散制造业和流程生产行业的MES应用现状&#xff0c;指出了这两个行业在部署MES管理系统时存在差异的原因&#xff0c;并探讨了如何在离散制造业提升生产效率&…

前端Javascript模块化

&#x1f3ac; 岸边的风&#xff1a;个人主页 &#x1f525; 个人专栏 :《 VUE 》 《 javaScript 》 ⛺️ 生活的理想&#xff0c;就是为了理想的生活 ! 目录 引言 前端模块化的发展历程 1.全局函数式编程 2.命名空间模式 3.CommonJS require函数 module.exports 4.AM…

PMP-项目启动过程组的重要性

一、什么是项目启动过程组 启动过程组包括定义一个新项目或现有项目的一个新阶段&#xff0c;授权开始该项目或阶段的一组过程。启动过程组的目的是&#xff1a;协调相关方期望与项目目的&#xff0c;告知相关方项目范围和目标&#xff0c;并商讨他们对项目及相关阶段的参与将如…

JMeter-BeanShell预处理程序和BeanShell后置处理程序的应用

一、什么是BeanShell&#xff1f; BeanShell是用Java写成的,一个小型的、免费的、可以下载的、嵌入式的Java源代码解释器&#xff0c;JMeter性能测试工具也充分接纳了BeanShell解释器&#xff0c;封装成了可配置的BeanShell前置和后置处理器&#xff0c;分别是 BeanShell Pre…

远程桌面工具

PRemoteM 是一款现代的远程会话管理和启动器&#xff0c;它让你能够在任何时候快速开启一个远程会话。目前 PRemoteM 已支持 微软远程桌面(RDP)、VNC、SSH、Telnet、SFTP, FTP, RemoteApp等协议。 图片 1 PRemoteM 简介 如果你远程连接windows桌面仍旧在使用winR&#xff0c;输…

靶场上新:Openfire身份认证绕过

本文由掌控安全学院-江月投稿 封神台新上线漏洞复现靶场&#xff1a;Openfire身份认证绕过。 漏洞详情&#xff1a; Openfire是采用Java编程语言开发的实时协作服务器&#xff0c;Openfire的管理控制台是一个基于Web的应用程序&#xff0c;被发现可以使用路径遍历的方式绕过…

内网穿透——Windows搭建服务器

文章目录 1.前言2. Emby网站搭建2.1. Emby下载和安装2.2 Emby网页测试 3. 本地网页发布3.1 注册并安装cpolar内网穿透3.2 Cpolar云端设置3.3 Cpolar内网穿透本地设置 4.公网访问测试5.结语 1.前言 在现代五花八门的网络应用场景中&#xff0c;观看视频绝对是主力应用场景之一&…

机器学习基础之《分类算法(6)—决策树》

一、决策树 1、认识决策树 决策树思想的来源非常朴素&#xff0c;程序设计中的条件分支结构就是if-else结构&#xff0c;最早的决策树就是利用这类结构分割数据的一种分类学习方法 2、一个对话的例子 想一想这个女生为什么把年龄放在最上面判断&#xff01;&#xff01;&…

四叶草clover配置工具:Clover Configurator for Mac

Clover Configurator是一款Mac上的工具&#xff0c;用于配置和优化Clover引导加载器。Clover引导加载器是一种用于启动macOS的开源引导加载器。它允许用户在启动时选择操作系统和配置启动选项。 Clover Configurator提供了一个可视化的界面&#xff0c;让用户可以轻松地编辑和…