SpringBoot教程(十五) | SpringBoot集成RabbitMq

SpringBoot教程(十五) | SpringBoot集成RabbitMq

RabbitMq是我们在开发过程中经常会使用的一种消息队列。今天我们来研究研究rabbitMq的使用。

rabbitMq的官网: rabbitmq.com/

rabbitMq的安装这里先略过,因为我尝试了几次都失败了,后面等我安装成功了会把详细的文章发出来。目前是使用公司的环境进行的调试。

1. 一些概念

RabbitMQ是一个开源的消息代理和队列服务器,用来实现各个应用服务间的数据共享(跨平台 ,跨语言)。RabbitMQ是使用erlang语言编写的,并且基于AMQP协议实现。

所有的消息队列产品模型抽象上来说,都是类似的过程。生产者创建消息,然后发布到消息队列中,由消费者进行消费。

而rabbitMQ也是类似的,有生产者,消费者角色。其内部结构如下图所示。

image.png

那么接下来我们就来介绍一下RabbitMQ中的这些概念。

1. Message:

消息,就是我们需要传递和共享的信息,消息由一些列的可选属性组成,包括路由键,优先级,是否持久化等信息

2. Publisher

消息的生产者,也是一个向交换机发布消息的客户端应用程序。

3. Exchange:

交换机,这是RabbitMQ中的一个非常重要的概念,在rabbitMq中,生产者产生的消息都不是直接发送到队列中去的,而是发送到了交换机中,交换机会通过一定的规则绑定队列,交换机会根据相应的路由规则发送给对服务器中的队列。

4. Binding:

绑定, 用于交换机和消息列队之间的关联。一个绑定就是基于路由键(routing-key)将交换机和消息队列连接起来的路由规则。所以可以将交换机理解成一个有绑定有成的路由表。

5. Queue:

消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可以投入一个或多个队列中。消息一直在对队列里边,等待消费者连接到这个队列将其消费。

6. Connection:

网络连接,比如一个TCP连接。

7. Channel

信道,多路复用连接中的一条独立的双向数据流通道。信道是简历在真实的TCP连接内的虚拟连接。AMQP命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成的。因为对于操作系统过来说建立和销毁TCP都是非常昂贵的开销,所以引入了信道的概念,以复用一条TCP连接。

8. Consumer

消息的消费者,表示一个从消息队列中取得消息的客户端应用。

9. Virtual Host

虚拟主机,标识一批交换机、消息队列和相关对象。 虚拟主机是相同的身份认证和加密环境的独立服务器域。 每个vhost本质就是一个mini版的rabbitMQ服务器,拥有自己的队列,交换机,绑定和权限机制。vhost是AMQP概念的基础,必须在连接时指定,RabbitMQ的默认vhost是/.

10. Broker

标识消息队列服务器实体。

2. Exchange类型

Exchange分发消息的时候根据类型的不同分发策略有所区别,目前常见的有四种类型: direct、fanout、topic、headers。 headers匹配AMQP消息的header而不是路由键,此外headers交换机和direct交换机完成一直但是性能差很多,几乎用不到了,所以直接看另外三种类型。

2.1 direct交换机

image.png

消息中的路由键(routing key)如果和Binding中的bing key一致,交换机就将消息发送到队列的队列中。路由键要完全匹配,单个传播。

2.2 fanout

image.png

每个发到fanout类型交换机的消息都会分到所有绑定的队列上去。fanout交换器不处理路由键,只是简单的将队列绑定到交换机上,每个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout类型转发消息是最快的。

2.3 topic

image.png

topic交换机通过模式匹配分配路由的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用.隔开。它同样会识别两个通配符: # 和* 。 #匹配0个或多个单词, * 匹配一个单词

3. springBoot集成RabbitMQ

SpringBoot集成rabbitMQ还是比较简单的,因为springBoot使用RabbitTemplate对常用操作进行了封装。

接下来我们来看一下集成过程。首先导入依赖。

xml复制代码<!-- 无需在parent的配置文件中添加 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

然后在springBoot的配置文件 application.yml中配置rabbitMQ连接信息

yml复制代码server:port: 7890spring:rabbitmq:host: 172.15.33.52port: 5672username: rootpassword: 123456

接下来我们我们分三种交换机进行演示。

3.1 direct

首先是配置类,在配置类中我们需要声明交换机,队列和绑定关系。

java复制代码@Configuration
public class DirectExchangeConfig {public static final String DIRECT_QUEUE = "directQueue";public static final String DIRECT_QUEUE2 = "directQueue2";public static final String DIRECT_EXCHANGE = "directExchange";public static final String DIRECT_ROUTING_KEY = "direct";@Beanpublic Queue directQueue() {return new Queue(DIRECT_QUEUE, true);}@Beanpublic Queue directQueue2() {return new Queue(DIRECT_QUEUE2, true);}@Beanpublic DirectExchange directExchange() {return new DirectExchange(DIRECT_EXCHANGE, true, false);}@Beanpublic Binding bindingDirectExchange(Queue directQueue, DirectExchange directExchange) {return BindingBuilder.bind(directQueue).to(directExchange).with(DIRECT_ROUTING_KEY);}@Beanpublic Binding bindingDirectExchange2(Queue directQueue2, DirectExchange directExchange) {return BindingBuilder.bind(directQueue2).to(directExchange).with(DIRECT_ROUTING_KEY);}}

这里我们创建了一个叫directExchange的交换机,绑定了directQueue和directQueue2两个队列,路由键是direct.

消息的生产者,我们通过一个Controller来进行模拟,直接引用rabbitTemplate

java复制代码@RestController
@Slf4j
@RequestMapping("/direct")
public class DirectController {private final RabbitTemplate rabbitTemplate;public DirectController(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;}/*** direct交换机为直连模式交换机*      根据消息携带的路由键将消息投递给对应队列*** @return*/@GetMapping("send")public Object sendMsg() {rabbitTemplate.convertAndSend(DirectExchangeConfig.DIRECT_EXCHANGE, DirectExchangeConfig.DIRECT_ROUTING_KEY, "发送一条测试消息:direct");return "direct消息发送成功!!";}

当我在浏览器访问对应连接的时候,就会生产一条消息发送到directExchange交换机,路由key为:direct, 消息内容为:发送一条测试消息:direct

接下来我们来看消息的消费者。

java复制代码package com.lsqingfeng.action.rabbitmq.direct;import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** @className: DirectQueueListener* @description: 直连交换机的监听器* @author: sh.Liu* @date: 2021-08-23 16:03*/
@Slf4j
@Component
public class DirectQueueListener {/*** 尽管设置了两个消费者,但是只有一个能够消费成功* 多次发送则轮训消费:* DirectReceiver消费者收到消息1  : 发送一条测试消息:direct* DirectReceiver消费者收到消息2  : 发送一条测试消息:direct* DirectReceiver消费者收到消息1  : 发送一条测试消息:direct* DirectReceiver消费者收到消息2  : 发送一条测试消息:direct** 一个交换机可以绑定多个队列。如果通过路由key可以匹配到多个队列,消费的时候也只能有一个进行消费* @param testMessage*/@RabbitHandler@RabbitListener(queues = DirectExchangeConfig.DIRECT_QUEUE)public void process(String testMessage) {System.out.println("DirectReceiver消费者收到消息1  : " + testMessage);}@RabbitHandler@RabbitListener(queues = DirectExchangeConfig.DIRECT_QUEUE)public void process2(String testMessage) {System.out.println("DirectReceiver消费者收到消息2  : " + testMessage);}@RabbitHandler@RabbitListener(queues = DirectExchangeConfig.DIRECT_QUEUE2)public void process3(String testMessage) {System.out.println("DirectReceiver消费者收到消息3  : " + testMessage);}}

当我们访问浏览器生产消息会,观察控制台结果:

DirectReceiver消费者收到消息1 : 发送一条测试消息:direct DirectReceiver消费者收到消息3 : 发送一条测试消息:direct

在发送一次:

DirectReceiver消费者收到消息3 : 发送一条测试消息:direct DirectReceiver消费者收到消息2 : 发送一条测试消息:direct

由于我们又两个队列都绑定了交换机,且routeKey一样,所以会打印两条。要注意direct只有routeKey完全匹配的时候才能被消费,同时每个队列中的消息只会 被消费一次。

3.2 fanout

配置类:

java复制代码@Configuration
public class FanoutExchangeConfig {public static final String FANOUT_QUEUE = "fanoutQueue";public static final String FANOUT_QUEUE2 = "fanoutQueue2";public static final String FANOUT_QUEUE3 = "fanoutQueue3";public static final String FANOUT_EXCHANGE = "fanoutExchange";public static final String FANOUT_ROUTING_KEY = "fanout";@Beanpublic Queue fanoutQueue() {return new Queue(FANOUT_QUEUE, true);}@Beanpublic Queue fanoutQueue2() {return new Queue(FANOUT_QUEUE2, true);}@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange(FANOUT_EXCHANGE, true, false);}@Beanpublic Binding bindingFanoutExchange(Queue fanoutQueue, FanoutExchange fanoutExchange) {return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);}@Beanpublic Binding bindingFanoutExchange2(Queue fanoutQueue2, FanoutExchange fanoutExchange) {return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}

这里也是用一个Fanout类型的交换机绑定了两个队列,要注意在这种模式下,是不需要指定routing-Key的,因为所有绑定的队列都会收到消息。

生产者代码如下:

java复制代码@RestController
@Slf4j
@RequestMapping("/fanout")
public class FanoutController {private final RabbitTemplate rabbitTemplate;public FanoutController(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;}/*** fanout交换机为扇形模式交换机*      消息会发送到所有绑定的队列上。* @return*/@GetMapping("send")public Object sendMsg() {rabbitTemplate.convertAndSend(FanoutExchangeConfig.FANOUT_EXCHANGE, null, "发送一条测试消息:fanout");return "fanout消息发送成功!!";}
}

消息的消费者:

java复制代码@Slf4j
@Component
public class FanoutQueueListener {/*** fanout交换机: 扇型交换机,这个交换机没有路由键概念,就算你绑了路由键也是无视的。 这个交换机在接收到消息后,会直接转发到绑定到它上面的所有队列* 同一个队列监听多次,只会消费一次。* 交换机绑定的多个队列都可以收到消息* @param testMessage*/@RabbitHandler@RabbitListener(queues = FanoutExchangeConfig.FANOUT_QUEUE)public void process(String testMessage) {System.out.println("FanoutReceiver消费者收到消息1  : " + testMessage);}@RabbitHandler@RabbitListener(queues = FanoutExchangeConfig.FANOUT_QUEUE)public void process2(String testMessage) {System.out.println("FanoutReceiver消费者收到消息2  : " + testMessage);}@RabbitHandler@RabbitListener(queues = FanoutExchangeConfig.FANOUT_QUEUE2)public void process3(String testMessage) {System.out.println("FanoutReceiver消费者收到消息3  : " + testMessage);}}

打印结果:

FanoutReceiver消费者收到消息1 : 发送一条测试消息:fanout FanoutReceiver消费者收到消息3 : 发送一条测试消息:fanout

因为方法1和方法2监听的是同一个队列,只有一个可以消费成功。多次执行,两个方法交替执行。

3.3 topic

主题交换机,会根据routing-Key的匹配规则,将消息发送到符合规则的队列中。

配置类:

java复制代码/*** @className: TopicExchangeConfig* @description:* *  (星号) 用来表示一个单词 (必须出现的)* #  (井号) 用来表示任意数量(零个或多个)单词* @author: sh.Liu* @date: 2021-08-23 15:49*/
@Configuration
public class TopicExchangeConfig {public static final String TOPIC_QUEUE = "topicQueue";public static final String TOPIC_QUEUE2 = "topicQueue2";public static final String TOPIC_QUEUE3 = "topicQueue3";public static final String TOPIC_EXCHANGE = "topicExchange";public static final String TOPIC_ROUTING_KEY = "topic*";@Beanpublic Queue topicQueue() {return new Queue(TOPIC_QUEUE, true);}@Beanpublic Queue topicQueue2() {return new Queue(TOPIC_QUEUE2, true);}@Beanpublic Queue topicQueue3() {return new Queue(TOPIC_QUEUE3, true);}@Beanpublic TopicExchange topicExchange() {return new TopicExchange(TOPIC_EXCHANGE, true, false);}@Beanpublic Binding bindingTopicExchange(Queue topicQueue, TopicExchange topicExchange) {return BindingBuilder.bind(topicQueue).to(topicExchange).with("topic.#");}@Beanpublic Binding bindingTopicExchange2(Queue topicQueue2, TopicExchange topicExchange) {return BindingBuilder.bind(topicQueue2).to(topicExchange).with("test.#");}@Beanpublic Binding bindingTopicExchange3(Queue topicQueue3, TopicExchange topicExchange) {return BindingBuilder.bind(topicQueue3).to(topicExchange).with("#");}
}

这里要注意我们的绑定管关系。分别是topic.#, test.*, #

#: 代表所有,* 代表有且只有一个。

消息的发送者,我们将routingKey作为参数方便我们看效果:

java复制代码@RestController
@Slf4j
@RequestMapping("/topic")
public class TopicController {private final RabbitTemplate rabbitTemplate;public TopicController(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;}@GetMapping("send")public Object sendMsg(String routingKey) {rabbitTemplate.convertAndSend(TopicExchangeConfig.TOPIC_EXCHANGE, routingKey, "发送一条测试消息:topic");return "topic消息发送成功!!";}

消息的消费者:

java复制代码/*** @className: TopicQueueListener* @description: 主题交换机的监听器* @author: sh.Liu* @date: 2021-08-23 16:03*/
@Slf4j
@Component
public class TopicQueueListener {/*** topic: 主题交换机* @param testMessage*/@RabbitHandler@RabbitListener(queues = TopicExchangeConfig.TOPIC_QUEUE)public void process(String testMessage) {System.out.println("TopicReceiver消费者收到消息1  : " + testMessage);}@RabbitHandler@RabbitListener(queues = TopicExchangeConfig.TOPIC_QUEUE)public void process2(String testMessage) {System.out.println("TopicReceiver消费者收到消息2  : " + testMessage);}@RabbitHandler@RabbitListener(queues = TopicExchangeConfig.TOPIC_QUEUE2)public void process3(String testMessage) {System.out.println("TopicReceiver消费者收到消息3  : " + testMessage);}@RabbitHandler@RabbitListener(queues = TopicExchangeConfig.TOPIC_QUEUE3)public void process4(String testMessage) {System.out.println("TopicReceiver消费者收到消息4  : " + testMessage);}}

请求:http://localhost:7890/topic/send?routingKey=test.a

结果:

TopicReceiver消费者收到消息3 : 发送一条测试消息:topic TopicReceiver消费者收到消息4 : 发送一条测试消息:topic

代表: test.* 和 # 与路由key匹配成功

请求:http://localhost:7890/topic/send?routingKey=topic.123

TopicReceiver消费者收到消息1 : 发送一条测试消息:topic TopicReceiver消费者收到消息4 : 发送一条测试消息:topic

代表: topic.# 和 # 匹配成功

请求: http://localhost:7890/topic/send?routingKey=test

TopicReceiver消费者收到消息4 : 发送一条测试消息:topic

test.* 后面必须要有一个单词

请求: http://localhost:7890/topic/send?routingKey=test.aaa

TopicReceiver消费者收到消息4 : 发送一条测试消息:topic TopicReceiver消费者收到消息3 : 发送一条测试消息:topic

test.*和 #匹配成功

请求:http://localhost:7890/topic/send?routingKey=test.aaa.b

TopicReceiver消费者收到消息4 : 发送一条测试消息:topic

只对# 匹配成功, 因为test.*只能匹配一个单词,aaa.b代表两个

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/410589.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

flink1.15 维表join guava cache和mysql方面优化

优化前 mysql响应慢,导致算子中数据输出追不上输入,导致显示cpu busy:100% 优化后效果两个图对应两个时刻: - - -- 优化前 select l.id,JSON_EXTRACT(r.msg,$$.key1) as msgv (select id,uid from tb1 l where id?) join (select uid,msg from tb2) r on l.uidr.uid;-- 优化…

计算机网络(超详解!) 第二节 数据链路层(上)

1.数据链路层使用的信道 数据链路层使用的信道主要有以下两种类型&#xff1a; 1.点对点信道&#xff1a;这种信道使用一对一的点对点通信方式。 2.广播信道&#xff1a;这种信道使用一对多的广播通信方式&#xff0c;因此过程比较复杂。广播信道上连接的主机很多&#xff0…

MetaGPT入门(一)

本文在Win11操作系统下进行&#xff0c;工具pycharm 一、环境准备 1.建议使用conda虚拟环境 安装anaconda参考&#xff1a;Windows10下Anaconda的安装_windows anaconda 路径-CSDN博客 打开Anaconda Powershell Prompt命令窗口&#xff0c;输入下面命令&#xff0c;创建3.1…

使用vite框架封装vue3插件,发布到npm

目录 一、vue环境搭建 1、创建App.vue 2、修改main.ts 3、修改vite.config.ts 二、插件配置 1、创建插件 2、开发调试 3、打包配置 4、package.json文件配置 5、执行打包命令 pnpm build 6、修改index.d.ts 目录 一、vue环境搭建 1、创建App.vue 2、修改main.ts 3…

华为设备vlan下配置MSTP,STP选举

核心代码,不同实例&#xff0c;承载不同流量&#xff0c;为每个实例设置一个根网桥达到分流的效果 stp region-config //进入stp区域的设置 region-name R1 //区域命名为R1 instance 1 vlan 10 …

蒙特卡洛概率抽样简介

蒙特卡罗方法是一类对概率分布进行随机抽样的技术。 在许多问题领域中&#xff0c;描述或估计概率分布相对简单&#xff0c;但计算所需的数量却很棘手。这可能是由于多种原因造成的&#xff0c;例如domain的随机性质或随机变量的指数级数量增长。 相反&#xff0c;可以通过使…

uniapp微信小程序投票系统实战 (SpringBoot2+vue3.2+element plus ) -投票帖子明细实现

锋哥原创的uniapp微信小程序投票系统实战&#xff1a; uniapp微信小程序投票系统实战课程 (SpringBoot2vue3.2element plus ) ( 火爆连载更新中... )_哔哩哔哩_bilibiliuniapp微信小程序投票系统实战课程 (SpringBoot2vue3.2element plus ) ( 火爆连载更新中... )共计21条视频…

【FastAPI】请求体

在 FastAPI 中&#xff0c;请求体&#xff08;Request Body&#xff09;是通过请求发送的数据&#xff0c;通常用于传递客户端提交的信息。FastAPI 使得处理请求体变得非常容易。 请求体是客户端发送给 API 的数据。响应体是 API 发送给客户端的数据 注&#xff1a;不能使用 …

Mantle: A Programmable Metadata Load Balancer for the Ceph File System——论文泛读

SC 2015 Paper 元数据论文阅读汇总 问题 优化Ceph的元数据局部性和负载平衡。 现有方法 提高元数据服务性能的最常见技术是在专用的元数据服务器&#xff08;MDS&#xff09;节点之间平衡负载 [16, 25, 26, 21, 28]。常见的方法是鼓励独立增长并减少通信&#xff0c;使用诸…

Qt 状态机框架:The State Machine Framework (一)

传送门: Qt 状态机框架:The State Machine Framework (一) Qt 状态机框架:The State Machine Framework (二) 一、什么是状态机框架 状态机框架提供了用于创建和执行状态图/表[1]的类。这些概念和表示法基于Harel的Statecharts&#xff1a;一种复杂系统的可视化形式&#xff…

12AOP面向切面编程/GoF之代理模式

先看一个例子&#xff1a; 声明一个接口&#xff1a; // - * / 运算的标准接口! public interface Calculator {int add(int i, int j);int sub(int i, int j);int mul(int i, int j);int div(int i, int j); }实现该接口&#xff1a; package com.sunsplanter.prox…

Controller层自定义注解拦截request请求校验

一、背景 笔者工作中遇到一个需求&#xff0c;需要开发一个注解&#xff0c;放在controller层的类或者方法上&#xff0c;用以校验请求参数中(不管是url还是body体内&#xff0c;都要检查&#xff0c;有token参数&#xff0c;且符合校验规则就放行)是否传了一个token的参数&am…