人生永没有终点。只有等到你瞑目的那一刻,才能说你走完了人生路,在此之前,新的第一次始终有,新的挑战依然在,新的感悟不断涌现。
文章目录
- 一、MQ与RabbitMQ概述
- 1. MQ简述
- 2. MQ的优势
- 3. MQ的劣势
- 4. 常见的MQ产品
- 5. RabbitMQ(兔子MQ😀)
- 二、RabbitMQ安装与配置
- 1. 基于docker快速安装RabbitMQ
- 2. 创建用户和虚拟机
- 三、RabbitMQ快速入门
- 1. 基础环境搭建
- 2. publisher消息发布者实现
- 3. consumer消费者实现
- 四、SpringAMQP与RabbitMQ工作模型
- 1. SpringAMQP概述
- 2. BasicQueue 基本模型(简单模型)
- 3. WorkQueue 工作模型
- 4. Publish、Subscribe 发布订阅模型
- 4.1 Fanout 广播模型
- 4.2 Direct 路由模型
- 4.3 Topic 主题模型
- 5. 消息转换器
- 5.1 使用默认消息转换器发送Object类型消息
- 5.2 使用Jackson消息转换器收发JSON消息
- 5.3 使用默认消息转换器收发JSON消息
一、MQ与RabbitMQ概述
1. MQ简述
MQ(全称:Message Queue)直译是消息队列,是基础数据结构中 “先进先出” 的一种数据结构,也是在消息的传输过程中保存消息的容器(中间件),多用于分布式系统之间进行通信。
一般MQ用来解决系统耦合、异步消息、流量削峰等问题,实现高性能、高可用、可伸缩和最终一致性架构。(AP架构)
总结:
-
消息队列(MQ),是一种中间件,用于存储和传递消息。
-
分布式系统有两种通信方式:直接远程调用(如OpenFeign) 和 借助第三方完成间接通信(如RabbitMQ)。
-
发送方称为生产者,接收方称为消费者。
2. MQ的优势
MQ的优势:(应用解耦、异步、削峰)
- 应用解耦:提高系统容错性和可维护性;
- 异步提速:提升用户体验和系统吞吐量;
- 削峰填谷:提高系统稳定性。
1、应用解耦
2、异步提速
3、削峰填谷(秒杀)
使用MQ之后,限制消费消息的速度为1000,这样一来,高峰期产生的数据势必会被积压在MQ中,高峰就被“削”掉了。
但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000,直到消费完积压的消息,这就叫做“填谷”,从而提升系统的稳定性。
3. MQ的劣势
引入MQ会遇到下列问题:
- 消息可靠性问题(如何确保发送的消息至少被消费者消费一次,避免消息丢失问题)
- 延迟消息问题 (如何实现消息的延迟投递,解决方案:使用延时队列、TTL、延迟队列插件实现)
- 高可用问题(如何避免单点MQ故障而导致的不可用问题,解决方案:搭建MQ集群)
- 消息堆积问题(如何解决数百万消息堆积,无法及时消费的问题)
4. 常见的MQ产品
市面上有很多MQ产品,例如RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMQ、EMQ(物联网) 等,也有直接使用Redis充当消息队列的场景。在实际技术选型时,需要结合自身需求及MQ产品特点来综合考虑。
- Kafka: 是一个高可用性、高吞吐量的分布式消息系统。它具有持久化、持续性、可扩展性和副本机制,并支持多分区和多消费者组。Kafka适用于大规模的数据流处理,如日志聚合、流处理和实时数据流。在追求可用性和高吞吐能力方面,Kafka是一个不错的选择。
- RocketMQ: 是一个低延迟、高吞吐量的分布式消息队列系统。它提供了可靠的消息传递机制,支持高并发和高可用性的消息发布和订阅。RocketMQ适用于大规模的消息处理和异步通信场景。在追求可用性、可靠性和吞吐能力方面,RocketMQ是一个较好的选择。
- RabbitMQ: 是一个可靠性较高、低延迟的开源消息队列系统。它采用AMQP协议,支持多种消息模式和消息确认机制。RabbitMQ适用于可靠性要求较高的任务和通信场景。在追求可用性、可靠性和低延迟方面,RabbitMQ是一个合适的选择。
追求可用性(高->低):Kafka、 RocketMQ 、RabbitMQ;
追求可靠性:RabbitMQ、RocketMQ;
追求吞吐能力:RocketMQ、Kafka;
追求消息低延迟:RabbitMQ、Kafka。
5. RabbitMQ(兔子MQ😀)
RabbitMQ官网:http://www.rabbitmq.com/
RabbitMQ是基于AMQP协议使用Erlang语言开发的一款消息队列产品。
AMQP (全称Advanced Message Queuing Protocol,表示高级消息队列协议),是一个网络协议,是应用层协议的一个开放标、准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。2006年,AMQP规范发布。类比HTTP。(同类的协议还有MQTT用于物联网场景下)
在RabbitMQ中,有以下一些角色:
-
Producer(生产者):生产者是指发送消息到RabbitMQ的应用程序。它创建消息并将其发送到交换器。
-
Consumer(消费者):消费者是指从RabbitMQ接收消息的应用程序。它基于订阅的方式从队列中获取消息并进行处理。
-
Exchange(交换器):交换器是消息的路由中心。当生产者发送消息时,通过交换器将消息路由到一个或多个队列。
-
Queue(队列):队列是RabbitMQ中存储消息的地方。消费者从队列中接收消息,并进行处理。
-
Binding(绑定):绑定将交换器与队列相关联。它定义了消息从交换器到队列的路由规则。
-
Broker(代理服务器):代理服务器是RabbitMQ的核心组件,负责接收和传递消息。它负责处理交换器、队列、消息的路由和转发。
-
Channel(信道):信道是RabbitMQ使用的通信通道,生产者和消费者通过信道与代理服务器进行交互。
-
Virtual Host(虚拟主机):虚拟主机在RabbitMQ中用于将不同的应用隔离开来。每个虚拟主机具有自己的交换器、队列和绑定。
这些角色共同组成了RabbitMQ的基本架构。生产者发送消息到交换器,通过绑定将消息路由到队列,消费者从队列中接收消息并进行处理。代理服务器负责消息的接收、传递和路由。信道用于生产者和消费者与代理服务器的通信,而虚拟主机提供了应用隔离的环境。
RabbitMQ工作模式:
文档地址:https://www.rabbitmq.com/getstarted.html
RabbitMQ提供了6种工作模型,但是我们常用的只有5种:简单队列模型、工作队列模型、发布订阅模型(广播、路由、主题)。(第6种RPC远程调用不属于MQ)
JMS (Sun公司提供一套Java操做消息队列的接口)
- JMS(JavaMessage Service),Java消息服务应用程序接口,即Java操作消息中间件的API;
- JMS是JavaEE规范的一种,类比JDBC;
- 很多消息中间件都实现了JMS规范,例如:ActiveMQ。RabbitMQ官方没有提供JMS的实现包,但是开源社区有提供。
二、RabbitMQ安装与配置
1. 基于docker快速安装RabbitMQ
扩展:docker-compose安装rabbitmq:https://gitee.com/aopmin/docker-compose/blob/master/Linux/RabbitMQ/docker-compose.yml
1、拉取镜像
docker pull rabbitmq:3.8-management
2、运行容器
docker run -di \-e RABBITMQ_DEFAULT_USER=admin \-e RABBITMQ_DEFAULT_PASS=123456 \-v mq-plugins:/plugins \--name rabbitmq \--hostname my-rabbit \-p 15672:15672 \-p 5672:5672 \--restart=always \rabbitmq:3.8-management
- \ 代表换行
- -e 指定环境变量
- -e RABBITMQ_DEFAULT_USER=admin 用户名
- -e RABBITMQ_DEFAULT_PASS=123456 密码
- -v 挂载数据卷
- -p 15672:15672 用于web管理页面使用的端口 (管理员页面,端口15672)
- -p 5672:5672 用于生产和消费端使用的端口(通信端口,也就是在代码里要使用的)
- -di ,d后台运行,i打开控制台交互
- –name mq 容器名字
- –hostname mq (这个参数在单机版mq配不配置都可以,用来设置主机名,搭建集群会用到);
扩展:启动xxx插件(后面会用到这个命令)
# 进入容器
docker exec -it rabbitmq /bin/bash# 启动xxx插件
rabbitmq-plugins enable xxx
RabbitMQ管理端:
管理端访问地址:http://192.168.150.103:15672/
2. 创建用户和虚拟机
1、添加一个新用户:
添加成功后列表会显示该用户,但是这个用户没有操作权限,需要为他创建一个虚拟机:
2、创建虚拟机
为指定用户授权:
最后该用户就可以操作这个虚拟机了:
三、RabbitMQ快速入门
使用传统写法完成简单模的消息传递:(特点:一条消息只能被一个消费者消费)
官方的HelloWorld示例是基于简单消息队列模来实现的,其中包括三个角色:
- publisher:消息发布者,将消息发送到队列queue;
- queue:消息队列,负责接受并缓存消息;
- consumer:订阅队列,处理队列中的消息。
1. 基础环境搭建
1、创建maven工程,并在pom文件中导入如下依赖:
<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.9.RELEASE</version><relativePath/>
</parent><dependencies><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--SpringAMQP依赖,可以操作RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--单元测试--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency>
</dependencies>
2、创建子模块publisher(生产者)、consumer(消费者),并编写启动类和yml配置文件:
# 日志输出格式配置
logging:pattern:dateformat: MM-dd HH:mm:ss:SSS
2. publisher消息发布者实现
消息收发流程:Connection连接、Channel通道、queue队列和exchange 交换机。
publisher消息发布者实现思路:
- 建立连接
- 创建Channel
- 声明队列
- 发送消息
- 关闭连接和channel
1、编写publisher测试代码:
package cn.aopmin.mq.test;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.jupiter.api.Test;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 生产者(传统写法)** @author 白豆五* @version 2023/07/2* @since JDK8*/
public class PublisherTest {/*** 发送消息** @throws IOException* @throws TimeoutException*/@Testpublic void testSendMessage() throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 1.1.设置连接参数,分别是:主机名、端口号、vhost虚拟主机、用户名、密码factory.setHost("192.168.150.103");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("admin");factory.setPassword("123456");// 1.2.建立连接Connection connection = factory.newConnection();// 2.创建通道ChannelChannel channel = connection.createChannel();// 3.创建队列String queueName = "simple.queue"; // 队列名称channel.queueDeclare(queueName, false, false, false, null);// 4.发送消息String message = "hello, rabbitmq!";channel.basicPublish("", queueName, null, message.getBytes());System.out.println("发送消息成功:【" + message + "】");// 5.关闭通道和连接channel.close();connection.close();}
}
2、在建立连接处打断点,并以debug方式启动(方便观察每个组件的创建)
查看连接信息:
回到IDEA继续按F8,查看通道信息:
继续按F8,查看队列信息:
最后直接放行程序,查看队列中的消息:
3. consumer消费者实现
consumer消费者实现思路:
- 建立连接
- 创建Channel
- 声明队列
- 订阅消息
1、编写消费者代码
package cn.aopmin.mq.test;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 消费者(传统写法)* @author 白豆五* @version 2023/04/27* @since JDK8*/
public class ConsumerTest {public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码factory.setHost("192.168.150.103");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("admin");factory.setPassword("123456");// 1.2.建立连接Connection connection = factory.newConnection();// 2.创建通道ChannelChannel channel = connection.createChannel();// 3.创建队列String queueName = "simple.queue";channel.queueDeclare(queueName, false, false, false, null);// 4.订阅消息channel.basicConsume(queueName, true, new DefaultConsumer(channel){// 接收消息的回调函数@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {// 5.处理消息String message = new String(body);System.out.println("接收到消息:【" + message + "】");}});System.out.println("等待接收消息。。。。");}
}
2、测试(消费者启动程序后会一直执行,不用的时候将程序结束即可)
四、SpringAMQP与RabbitMQ工作模型
1. SpringAMQP概述
AMQP是消息中间件收发消息的协议(规范),具体实现由各个消息中间厂商实现;(例如 RabbitMQ)
SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。
SpringAMQP的官方地址:https://spring.io/projects/spring-amqp
SpringAMQP提供了三个功能:
- 自动声明队列、交换机及其绑定关系;
- 基于注解的监听器模式,异步接收消息;
- 封装了RabbitTemplate工具,用于发送消息 。
RabbitMQ工作模型:简单队列模型、工作队列模型、发布订阅模型(广播、路由、主题)。
2. BasicQueue 基本模型(简单模型)
使用SpringAMQP实现简单模型的消息收发:
1、在父工程中引入spring-amqp起步依赖:
<!--SpringAMQP:可以操作RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
完整的pom.xml配置:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.aopmin</groupId><artifactId>rabbitmq02-BasicQueue</artifactId><version>1.0.0</version><packaging>pom</packaging><description>springAMQP实现简单模型消息传递</description><modules><module>publisher</module><module>consumer</module></modules><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.2</version><relativePath/></parent><dependencies><!-- lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!-- SpringAMQP:可以操作RabbitMQ --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- 单元测试 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency></dependencies><!-- 打包插件 --><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build>
</project>
2、消息发送
2.1、在publisher服务的application.yml中添加rabbitmq配置:
# RabbitMQ配置
spring:rabbitmq:host: 192.168.150.103 # 主机名port: 5672 # 端口virtual-host: / # 虚拟主机username: admin # 用户名password: 123456 # 密码# 日志配置
logging:pattern:dateformat: MM-dd HH:mm:ss:SSS
2.2、在publisher服务中编写测试类SpringAmqpTest,并利用RabbitTemplate实现消息发送:
package cn.aopmin.mq.test;import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
import org.springframework.test.context.junit4.SpringRunner;/*** 使用SpringAMQP实现简单模型的消息发送** @author 白豆五* @version 2023/07/2* @since JDK8*/
@SpringBootTest
// @RequiredArgsConstructor // 生成构造方法(构造器注入,要求注入的字段必须final修饰)
public class SpringAmqpTest {/*** RabbitTemplate是SpringAMQP中的核心类,用于实现消息的发送和接收*/@Autowiredprivate RabbitTemplate rabbitTemplate;/*** 测试简单模型的消息发送*/@Testpublic void testSimpleQueue() {// 队列名称String queueName = "simple.queue";// 消息String message = "hello, spring amqp!";// 发送消息rabbitTemplate.convertAndSend(queueName, message);}
}
3、消息接收
3.1、在consumer服务的application.yml中添加rabbitmq配置:
# RabbitMQ配置
spring:rabbitmq:host: 192.168.150.103 # 主机名port: 5672 # 端口virtual-host: / # 虚拟主机username: admin # 用户名password: 123456 # 密码# 日志配置
logging:pattern:dateformat: MM-dd HH:mm:ss:SSS
3.2、在consumer服务的com.baidou.mq.listener
包中创建SpringRabbitListener类:
package cn.aopmin.mq.listener;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** 消息监听类** @author 白豆五* @version 2023/07/2* @since JDK8*/
@Component
public class SpringRabbitListener {/*** 订阅消息** @param msg 消息* @throws InterruptedException*/@RabbitListener(queues = "simple.queue") // 配置要监听的队列: simple.queuepublic void listenSimpleQueueMessage(String msg) throws InterruptedException {System.out.println("消费者接收到消息:【" + msg + "】");}
}
4、测试
先启动consumer服务(启动类),然后再运行publisher服务中发送消息的测试代码。
3. WorkQueue 工作模型
工作队列模型(Work Queue Mode):消息按照一定的策略分配给多个消费者来解决消息堆积问题,适用于任务分发和负载均衡场景。
角色:生产者、队列、消费者
使用SpringAMQP实现工作队列模型的消息收发:
1、在消费者监听类中编写两个方法,监听同一个队列,模拟多个消费者。
package cn.aopmin.mq.listener;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** 消息监听类** @author 白豆五* @version 2023/07/2* @since JDK8*/
@Component
public class SpringRabbitListener {/*编写两个方法监听同一个队列,可以实现多个消费者同时消费一个队列的消息*/@RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage(String msg) throws InterruptedException {System.out.println("消费者1接收到消息:【" + msg + "】");}@RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage2(String msg) throws InterruptedException {System.out.println("消费者2接收到消息:【" + msg + "】");}
}
2、模拟生产者发多条消息:
/**
* 测试工作模型发消息
*/
@Test
public void testWork() {// 队列名称String queueName = "simple.queue";// 消息String message = "hello,rabbitmq";// 模拟发送100条消息for (int i = 1; i <= 100; i++) {rabbitTemplate.convertAndSend(queueName, message + i);}System.out.println("消息发送完毕!");
}
3、测试:先启动消费者服务,再执行生产者发送消息的代码。
消费者预取消息限制:
工作模型默认一人一半消息,可以通过修改消费者application.yml文件,配置prefetch属性,控制消费者预取消息的上限:
# RabbitMQ配置
spring:rabbitmq:host: 192.168.150.103 # 主机名port: 5672 # 端口virtual-host: / # 虚拟主机username: admin # 用户名password: 123456 # 密码listener:simple:prefetch: 1 # 消息预取策略(每次获取一条消息,处理完后再获取下一条)
prefetch属性用于指定消费者一次从RabbitMQ服务器预取的消息数量。通过限制预取消息的数量,你可以控制每个消费者同时处理的消息数量,从而实现负载均衡和资源控制。
4. Publish、Subscribe 发布订阅模型
发布订阅模型特点: 可以通过交换机(exchange)将一条消息发给多个队列(消费者)进行处理。
常见的exchange类型包括:Fanout广播、Direct路由、Topic主题。
交换机的主要作用:
- 接收生产者发送的消息
- 将消息按照规则路由到绑定过的队列中
- 它不能缓存消息,路由失败,消息丢失
SpringAMQP提供了一个Exchange接口,来表示所有不同类型的交换机:
4.1 Fanout 广播模型
Fanout Exchange,交换机会把收到的消息发送给绑定过的所有队列。(队列需要与交换机建立关系,然后才能收到对应消息)
接下来使用SpringAMQP演示Fanout Exchange收发消息:
1、在consumer服务中,利用代码声明队列、交换机,并将两者绑定
package cn.aopmin.mq.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 在消费端声明队列、交换机、绑定关系,这样就不用在rabbitmq管理页面手动创建了,这样在服务启动后springAMQP会自动创建** @author 白豆五* @version 2023/07/2* @since JDK8*/
@Configuration
public class MqConfig {/*** 声明Fanout交换机* 交换机名: exchange.fanout*/@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange("exchange.fanout");}/*** 声明队列* 队列名: fanout.queue1*/@Beanpublic Queue queue1() {return new Queue("fanout.queue1");}/*** 声明队列* 队列名: fanout.queue2*/@Beanpublic Queue queue2() {return new Queue("fanout.queue2");}/*** 绑定关系* 将队列1绑定到Fanout交换机上*/@Beanpublic Binding binding1(FanoutExchange fanoutExchange, Queue queue1) {return BindingBuilder.bind(queue1).to(fanoutExchange);}/*** 绑定关系* 将队列2绑定到Fanout交换机上*/@Beanpublic Binding binding2(FanoutExchange fanoutExchange, Queue queue2) {//参数注入,即参数名就是bean的名字return BindingBuilder.bind(queue2).to(fanoutExchange);}
}
消费者application.yml配置:
# RabbitMQ配置
spring:rabbitmq:host: 192.168.150.103 # 主机名port: 5672 # 端口virtual-host: / # 虚拟主机username: admin # 用户名password: 123456 # 密码# 日志配置
logging:pattern:dateformat: MM-dd HH:mm:ss:SSS
2、在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2
package cn.aopmin.mq.listener;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** 消息监听类** @author 白豆五* @version 2023/07/2* @since JDK8*/
@Component
public class FanoutListener {/*编写两个方法,分别监听队列1和队列2*/@RabbitListener(queues = "fanout.queue1")public void listenerFanoutQueue1(String msg) throws InterruptedException {System.out.println("消费者1接收到消息:【" + msg + "】");}@RabbitListener(queues = "fanout.queue2")public void listenerFanoutQueue2(String msg) throws InterruptedException {System.out.println("消费者2接收到消息:【" + msg + "】");}
}
编写监听类后,启动消费者服务会自动创建交换机和队列组件:
3、在publisher的测试类中编写向exchange.fanout发消息的代码:
/*** 测试广播模式发送消息*/
@Test
public void testFanout() {// 交换机名称String exchangeName = "exchange.fanout";// 消息String message = "hello,rabbitmq";// 发送消息// 第一个参数是交换机名称// 第二个参数是routingKey(路由key),在广播模式下不需要指定// 第三个参数是消息rabbitTemplate.convertAndSend(exchangeName, "", message);System.out.println("消息发送完毕!");
}
执行测试方法,查看运行结果:
4.2 Direct 路由模型
Direct exchange,会将接收到的消息按照规则(Routing key)转发到指定的队列,因此称为路由模式(routes)
在交换机上做了一层规则判断操作。
Fanou模型要求:
- 每一个Queue都与Exchange设置一个BindingKey;
- 发布者发送消息时,指定消息的RoutingKey;
- Exchange会将消息路由到BindingKey与消息RoutingKey一致的队列上。
基于AMQP演示Direct模型::
1、在consumer服务的监听类中,编写两个消费者方法,并在方法上通过@RabbitListener组合注解声明Exchange、Queue、RoutingKey,然后分别监听direct.queue1和direct.queue2队列中的消息:
package cn.aopmin.mq.listener;import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import javax.xml.ws.BindingType;/*** 消息监听类 (通过注解的方式声明交换机和队列、及绑定关系)** @author 白豆五* @version 2023/07/2* @since JDK8*/
@Component
public class DirectListener {/*** 在监听方法上通过注解的方式声明交换机和队列、及绑定关系* 队列: 通过@Queue注解创建队列* 交换机: 通过@Exchange注解创建交换机* 绑定关系: 通过bindingkey绑定{"blue", "red"}** @param msg* @throws InterruptedException*/@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),//创建队列exchange = @Exchange(name = "exchange.direct", type = ExchangeTypes.DIRECT),//创建direct交换机key = {"blue", "red"} // bindingkey))public void listenDirect1(String msg) throws InterruptedException {System.out.println("消费者1接收到消息:【" + msg + "】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),//创建队列exchange = @Exchange(name = "exchange.direct", type = ExchangeTypes.DIRECT),//创建direct交换机key = {"yellow", "red"} // bindingkey))public void listenDirect2(String msg) throws InterruptedException {System.out.println("消费者2接收到消息:【" + msg + "】");}
}
2、在publisher中编写测试方法,向exchange.direct发送消息
/**
* 测试路由模式发送消息
*/
@Test
public void testDirect() {// 交换机名称String exchangeName = "exchange.direct";// 消息String message = "helloworld!";// 发送消息// 第一个参数是交换机名称// 第二个参数是routingKey(路由key,发消息时候用的),在路由模式下需要指定// 第三个参数是消息rabbitTemplate.convertAndSend(exchangeName, "blue", "routingKey:blue ---" + message);rabbitTemplate.convertAndSend(exchangeName, "red", "routingKey:red ---" + message);System.out.println("消息发送完毕!");
}
3、测试:启动消费者服务创建交换机和队列,然后执行生产者发消息方法
小节:
1、Direct与Fanout交换机的区别?
-
Fanout相对于Direct更灵活些。
-
Fanout交换机不做判断,收到消息就会广播给绑定的队列。
-
Direct交换机会根据RouthingKey判断,然后路由给满足规则的队列。
-
在Direct模型中,如果多个队列都有相同的RouthingKey,则与Fanout功能类似。
2、基于@RabbitListeneri注解声明队列和交换机有哪些常见注解?
- @QueueBinding 绑定关系
- @Queue 队列
- @Exchange 交换机
4.3 Topic 主题模型
Topic Exchange 与 Direct Exchange类似,区别在与RoutingKey必须是多个单词组成,并且以==.== 分割。(用的最多)
队列与交换机指定BindingKey时可以使用通配符:
#
:表示匹配0或多个单词;例如 china.# 、#.new*
:表示匹配1个单词;
- Queue1:绑定的是
java.#
,因此凡是以java.
开头的routing key
都会被匹配到,例如 java.news、java.blog; - Queue2:绑定的是
#.news
,因此凡是以.news
结尾的routing key
都会被匹配到,例如java.news、weather.news、heihe.weather.news。
基于AMQP演示Topic 模型:
1、在消费端的监听方法上声明交换机和队列、及绑定关系
package cn.aopmin.mq.listener;import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import javax.xml.ws.BindingType;/*** 消息监听类 (通过注解的方式声明交换机和队列、及绑定关系)** @author 白豆五* @version 2023/07/2* @since JDK8*/
@Component
public class TopicListener {@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),//创建队列exchange = @Exchange(name = "exchange.topic", type = ExchangeTypes.TOPIC),//创建direct交换机key = "java.#" // bindingkey))public void listenTopic1(String msg) throws InterruptedException {System.out.println("消费者1接收到消息:【" + msg + "】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),//创建队列exchange = @Exchange(name = "exchange.topic", type = ExchangeTypes.TOPIC),//创建direct交换机key = "#.news" // bindingkey))public void listenTopic2(String msg) throws InterruptedException {System.out.println("消费者2接收到消息:【" + msg + "】");}
}
2、在生产端,编写发消息方法
/*** 测试主题模式发送消息*/
@Test
public void testTopic() {// 交换机名称String exchangeName = "exchange.topic";// 消息String message = "helloworld!";// 发送消息// 第一个参数是交换机名称// 第二个参数是routingKey(路由key),在路由模式下需要指定// 第三个参数是消息rabbitTemplate.convertAndSend(exchangeName, "java.blog", message);rabbitTemplate.convertAndSend(exchangeName, "java.news", message);System.out.println("消息发送完毕!");
}
3、测试
小节
1、Direct和Topic交换机的区别?
- 相同点:两个交换机都会key进行判断。(即消息的路由key与队列的绑定key进行比较)
- 不同点:Topic队列的绑定key支持通配符更加灵活。Direct队列的绑定key不支持通配符,只能匹配具体key的消息。
5. 消息转换器
默认情况下,Spring会帮我们把发送的任意对象类型消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。
但是,Spring默认使用的是JDK序列化,JDK序列化会存在一些问题:
- 数据体积过大;
- 有安全隐患;
- 可读性差。
5.1 使用默认消息转换器发送Object类型消息
1、声明队列、编写监听方法
package cn.aopmin.mq.config;import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 声明队列** @author 白豆五* @version 2023/07/3* @since JDK8*/
@Configuration
public class MqConfig {@Beanpublic Queue ObjectQueue() {return new Queue("object.queue");}
}
@Component
public class ObjectListener {@RabbitListener(queues = "object.queue")public void listenObj(Object obj) {System.out.println("收到消息:" + obj.toString());}
}
2、编写发消息代码
/*** 测试默认消息转换器收发消息(JDK序列化)*/
@Test
public void testDefault() {// 队列名称String queueName = "object.queue";// 对象消息Map<String, Object> message = new HashMap<>();message.put("name", "张三");message.put("age", 18);// 发送消息rabbitTemplate.convertAndSend(queueName, message);System.out.println("消息发送完毕!");
}
测试,查看队列数据:(默认情况下JDK序列化的结果不直观,可以把消息转成json格式发送)
5.2 使用Jackson消息转换器收发JSON消息
Spring提供了org.springframework.amqp.support.converter.MessageConverter
接口来处理对象消息的转换。在AMQP中默认实现是SimpleMessageConverter,而SimpleMessageConverter它基于JDK的ObjectOutputStream完成序列化。
如果我们不想使用默认的消息转换器,只需在生产端和消费端配置MessageConverter类型的Bean即可。
1、生产端和消费端都引入jackson依赖:
<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version>
</dependency>
2、生产端和消费的都配置消息转换器
package cn.aopmin.mq;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;//生产端
@SpringBootApplication
public class PublisherApp {public static void main(String[] args) {SpringApplication.run(PublisherApp.class, args);}// 使用json序列化机制,进行消息转换@Beanpublic MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter();}
}
package cn.aopmin.mq;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;//消费端
@SpringBootApplication
public class ConsumerApp {public static void main(String[] args) {SpringApplication.run(ConsumerApp.class, args);}// 使用json序列化机制,进行消息转换@Beanpublic MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter();}
}
3、修改消费端的监听方法
@Component
public class ObjectListener {@RabbitListener(queues = "object.queue")public void listenObj(Map<String,Object> msg) {System.out.println("收到消息:" + msg);}
}
4、测试
5.3 使用默认消息转换器收发JSON消息
上一种方案,配来配去非常麻烦,而且一旦消息转换器不一样,就不能达到想要的结果。默认情况下,对于字符串类型的消息,默认的JDK消息转换器会使用UTF-8编码将字符串转换为字节数组,并将其作为消息体进行发送。
这样我们在发消息的时候手工将对象序列化为json字符串,在接收消息时再序列化为Java对象即可。
JSON工具类:
<dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.76</version>
</dependency>
常用方法:
- 序列化:JSON.toJSONString(xxx);
- 反序列化:JSONObject(str,Xxx.class);