RabbitMQ入门指南

在这里插入图片描述

人生永没有终点。只有等到你瞑目的那一刻,才能说你走完了人生路,在此之前,新的第一次始终有,新的挑战依然在,新的感悟不断涌现。

文章目录

  • 一、MQ与RabbitMQ概述
    • 1. MQ简述
    • 2. MQ的优势
    • 3. MQ的劣势
    • 4. 常见的MQ产品
    • 5. RabbitMQ(兔子MQ😀)
  • 二、RabbitMQ安装与配置
    • 1. 基于docker快速安装RabbitMQ
    • 2. 创建用户和虚拟机
  • 三、RabbitMQ快速入门
    • 1. 基础环境搭建
    • 2. publisher消息发布者实现
    • 3. consumer消费者实现
  • 四、SpringAMQP与RabbitMQ工作模型
    • 1. SpringAMQP概述
    • 2. BasicQueue 基本模型(简单模型)
    • 3. WorkQueue 工作模型
    • 4. Publish、Subscribe 发布订阅模型
      • 4.1 Fanout 广播模型
      • 4.2 Direct 路由模型
      • 4.3 Topic 主题模型
    • 5. 消息转换器
      • 5.1 使用默认消息转换器发送Object类型消息
      • 5.2 使用Jackson消息转换器收发JSON消息
      • 5.3 使用默认消息转换器收发JSON消息


一、MQ与RabbitMQ概述


1. MQ简述


MQ(全称:Message Queue)直译是消息队列,是基础数据结构中 “先进先出” 的一种数据结构,也是在消息的传输过程中保存消息的容器(中间件),多用于分布式系统之间进行通信。

一般MQ用来解决系统耦合、异步消息、流量削峰等问题,实现高性能、高可用、可伸缩和最终一致性架构。(AP架构)

在这里插入图片描述

总结:

  • 消息队列(MQ),是一种中间件,用于存储和传递消息。

  • 分布式系统有两种通信方式:直接远程调用(如OpenFeign) 和 借助第三方完成间接通信(如RabbitMQ)。

  • 发送方称为生产者,接收方称为消费者。


2. MQ的优势


MQ的优势:(应用解耦、异步、削峰)

  • 应用解耦:提高系统容错性和可维护性;
  • 异步提速:提升用户体验和系统吞吐量;
  • 削峰填谷:提高系统稳定性。

1、应用解耦

在这里插入图片描述

在这里插入图片描述


2、异步提速

在这里插入图片描述

在这里插入图片描述


3、削峰填谷(秒杀)

在这里插入图片描述

在这里插入图片描述

使用MQ之后,限制消费消息的速度为1000,这样一来,高峰期产生的数据势必会被积压在MQ中,高峰就被“削”掉了。

但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000,直到消费完积压的消息,这就叫做“填谷”,从而提升系统的稳定性。


3. MQ的劣势


引入MQ会遇到下列问题:

  • 消息可靠性问题(如何确保发送的消息至少被消费者消费一次,避免消息丢失问题)
  • 延迟消息问题 (如何实现消息的延迟投递,解决方案:使用延时队列、TTL、延迟队列插件实现)
  • 高可用问题(如何避免单点MQ故障而导致的不可用问题,解决方案:搭建MQ集群)
  • 消息堆积问题(如何解决数百万消息堆积,无法及时消费的问题)

4. 常见的MQ产品


市面上有很多MQ产品,例如RabbitMQ、RocketMQ、ActiveMQ、Kafka、ZeroMQ、MetaMQ、EMQ(物联网) 等,也有直接使用Redis充当消息队列的场景。在实际技术选型时,需要结合自身需求及MQ产品特点来综合考虑。

在这里插入图片描述

  • Kafka: 是一个高可用性、高吞吐量的分布式消息系统。它具有持久化、持续性、可扩展性和副本机制,并支持多分区和多消费者组。Kafka适用于大规模的数据流处理,如日志聚合、流处理和实时数据流。在追求可用性和高吞吐能力方面,Kafka是一个不错的选择。
  • RocketMQ: 是一个低延迟、高吞吐量的分布式消息队列系统。它提供了可靠的消息传递机制,支持高并发和高可用性的消息发布和订阅。RocketMQ适用于大规模的消息处理和异步通信场景。在追求可用性、可靠性和吞吐能力方面,RocketMQ是一个较好的选择。
  • RabbitMQ: 是一个可靠性较高、低延迟的开源消息队列系统。它采用AMQP协议,支持多种消息模式和消息确认机制。RabbitMQ适用于可靠性要求较高的任务和通信场景。在追求可用性、可靠性和低延迟方面,RabbitMQ是一个合适的选择。

追求可用性(高->低):Kafka、 RocketMQ 、RabbitMQ;

追求可靠性:RabbitMQ、RocketMQ;

追求吞吐能力:RocketMQ、Kafka;

追求消息低延迟:RabbitMQ、Kafka。


5. RabbitMQ(兔子MQ😀)


RabbitMQ官网:http://www.rabbitmq.com/

在这里插入图片描述

RabbitMQ是基于AMQP协议使用Erlang语言开发的一款消息队列产品。

AMQP (全称Advanced Message Queuing Protocol,表示高级消息队列协议),是一个网络协议,是应用层协议的一个开放标、准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。2006年,AMQP规范发布。类比HTTP。(同类的协议还有MQTT用于物联网场景下)


在RabbitMQ中,有以下一些角色:

  • Producer(生产者):生产者是指发送消息到RabbitMQ的应用程序。它创建消息并将其发送到交换器。

  • Consumer(消费者):消费者是指从RabbitMQ接收消息的应用程序。它基于订阅的方式从队列中获取消息并进行处理。

  • Exchange(交换器):交换器是消息的路由中心。当生产者发送消息时,通过交换器将消息路由到一个或多个队列。

  • Queue(队列):队列是RabbitMQ中存储消息的地方。消费者从队列中接收消息,并进行处理。

  • Binding(绑定):绑定将交换器与队列相关联。它定义了消息从交换器到队列的路由规则。

  • Broker(代理服务器):代理服务器是RabbitMQ的核心组件,负责接收和传递消息。它负责处理交换器、队列、消息的路由和转发。

  • Channel(信道):信道是RabbitMQ使用的通信通道,生产者和消费者通过信道与代理服务器进行交互。

  • Virtual Host(虚拟主机):虚拟主机在RabbitMQ中用于将不同的应用隔离开来。每个虚拟主机具有自己的交换器、队列和绑定。

这些角色共同组成了RabbitMQ的基本架构。生产者发送消息到交换器,通过绑定将消息路由到队列,消费者从队列中接收消息并进行处理。代理服务器负责消息的接收、传递和路由。信道用于生产者和消费者与代理服务器的通信,而虚拟主机提供了应用隔离的环境。
在这里插入图片描述

RabbitMQ工作模式:

文档地址:https://www.rabbitmq.com/getstarted.html

RabbitMQ提供了6种工作模型,但是我们常用的只有5种:简单队列模型、工作队列模型、发布订阅模型(广播、路由、主题)。(第6种RPC远程调用不属于MQ)

在这里插入图片描述

JMS (Sun公司提供一套Java操做消息队列的接口)

  • JMS(JavaMessage Service),Java消息服务应用程序接口,即Java操作消息中间件的API;
  • JMS是JavaEE规范的一种,类比JDBC;
  • 很多消息中间件都实现了JMS规范,例如:ActiveMQ。RabbitMQ官方没有提供JMS的实现包,但是开源社区有提供。

二、RabbitMQ安装与配置


1. 基于docker快速安装RabbitMQ


扩展:docker-compose安装rabbitmq:https://gitee.com/aopmin/docker-compose/blob/master/Linux/RabbitMQ/docker-compose.yml

1、拉取镜像

docker pull rabbitmq:3.8-management

在这里插入图片描述

2、运行容器

在这里插入图片描述

 docker run -di \-e RABBITMQ_DEFAULT_USER=admin \-e RABBITMQ_DEFAULT_PASS=123456 \-v mq-plugins:/plugins \--name rabbitmq \--hostname my-rabbit \-p 15672:15672 \-p 5672:5672 \--restart=always \rabbitmq:3.8-management
  • \ 代表换行
  • -e 指定环境变量
  • -e RABBITMQ_DEFAULT_USER=admin 用户名
  • -e RABBITMQ_DEFAULT_PASS=123456 密码
  • -v 挂载数据卷
  • -p 15672:15672 用于web管理页面使用的端口 (管理员页面,端口15672)
  • -p 5672:5672 用于生产和消费端使用的端口(通信端口,也就是在代码里要使用的)
  • -di ,d后台运行,i打开控制台交互
  • –name mq 容器名字
  • –hostname mq (这个参数在单机版mq配不配置都可以,用来设置主机名,搭建集群会用到);

扩展:启动xxx插件(后面会用到这个命令)

# 进入容器
docker exec -it rabbitmq /bin/bash# 启动xxx插件
rabbitmq-plugins enable xxx

RabbitMQ管理端:

管理端访问地址:http://192.168.150.103:15672/

在这里插入图片描述

在这里插入图片描述


2. 创建用户和虚拟机


1、添加一个新用户:

在这里插入图片描述

添加成功后列表会显示该用户,但是这个用户没有操作权限,需要为他创建一个虚拟机:

在这里插入图片描述


2、创建虚拟机

在这里插入图片描述

为指定用户授权:

在这里插入图片描述

最后该用户就可以操作这个虚拟机了:

在这里插入图片描述


三、RabbitMQ快速入门


使用传统写法完成简单模的消息传递:(特点:一条消息只能被一个消费者消费)

在这里插入图片描述

官方的HelloWorld示例是基于简单消息队列模来实现的,其中包括三个角色:

  • publisher:消息发布者,将消息发送到队列queue;
  • queue:消息队列,负责接受并缓存消息;
  • consumer:订阅队列,处理队列中的消息。

1. 基础环境搭建


1、创建maven工程,并在pom文件中导入如下依赖:

<parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.9.RELEASE</version><relativePath/>
</parent><dependencies><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!--SpringAMQP依赖,可以操作RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!--单元测试--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency>
</dependencies>

2、创建子模块publisher(生产者)、consumer(消费者),并编写启动类和yml配置文件:

# 日志输出格式配置
logging:pattern:dateformat: MM-dd HH:mm:ss:SSS

在这里插入图片描述


2. publisher消息发布者实现


消息收发流程:Connection连接、Channel通道、queue队列和exchange 交换机。

publisher消息发布者实现思路:

  • 建立连接
  • 创建Channel
  • 声明队列
  • 发送消息
  • 关闭连接和channel

1、编写publisher测试代码:

package cn.aopmin.mq.test;import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.jupiter.api.Test;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 生产者(传统写法)** @author 白豆五* @version 2023/07/2* @since JDK8*/
public class PublisherTest {/*** 发送消息** @throws IOException* @throws TimeoutException*/@Testpublic void testSendMessage() throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 1.1.设置连接参数,分别是:主机名、端口号、vhost虚拟主机、用户名、密码factory.setHost("192.168.150.103");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("admin");factory.setPassword("123456");// 1.2.建立连接Connection connection = factory.newConnection();// 2.创建通道ChannelChannel channel = connection.createChannel();// 3.创建队列String queueName = "simple.queue"; // 队列名称channel.queueDeclare(queueName, false, false, false, null);// 4.发送消息String message = "hello, rabbitmq!";channel.basicPublish("", queueName, null, message.getBytes());System.out.println("发送消息成功:【" + message + "】");// 5.关闭通道和连接channel.close();connection.close();}
}

在这里插入图片描述


2、在建立连接处打断点,并以debug方式启动(方便观察每个组件的创建)

在这里插入图片描述

查看连接信息:

在这里插入图片描述


回到IDEA继续按F8,查看通道信息:

在这里插入图片描述

在这里插入图片描述


继续按F8,查看队列信息:
在这里插入图片描述
在这里插入图片描述


最后直接放行程序,查看队列中的消息:

在这里插入图片描述
在这里插入图片描述


3. consumer消费者实现


consumer消费者实现思路:

  • 建立连接
  • 创建Channel
  • 声明队列
  • 订阅消息

1、编写消费者代码

package cn.aopmin.mq.test;import com.rabbitmq.client.*;import java.io.IOException;
import java.util.concurrent.TimeoutException;/*** 消费者(传统写法)* @author 白豆五* @version 2023/04/27* @since JDK8*/
public class ConsumerTest {public static void main(String[] args) throws IOException, TimeoutException {// 1.创建连接工厂ConnectionFactory factory = new ConnectionFactory();// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码factory.setHost("192.168.150.103");factory.setPort(5672);factory.setVirtualHost("/");factory.setUsername("admin");factory.setPassword("123456");// 1.2.建立连接Connection connection = factory.newConnection();// 2.创建通道ChannelChannel channel = connection.createChannel();// 3.创建队列String queueName = "simple.queue";channel.queueDeclare(queueName, false, false, false, null);// 4.订阅消息channel.basicConsume(queueName, true, new DefaultConsumer(channel){// 接收消息的回调函数@Overridepublic void handleDelivery(String consumerTag, Envelope envelope,AMQP.BasicProperties properties, byte[] body) throws IOException {// 5.处理消息String message = new String(body);System.out.println("接收到消息:【" + message + "】");}});System.out.println("等待接收消息。。。。");}
}

在这里插入图片描述

2、测试(消费者启动程序后会一直执行,不用的时候将程序结束即可)

在这里插入图片描述
在这里插入图片描述


四、SpringAMQP与RabbitMQ工作模型


1. SpringAMQP概述


AMQP是消息中间件收发消息的协议(规范),具体实现由各个消息中间厂商实现;(例如 RabbitMQ)

SpringAMQP是基于RabbitMQ封装的一套模板,并且还利用SpringBoot对其实现了自动装配,使用起来非常方便。

SpringAMQP的官方地址:https://spring.io/projects/spring-amqp

在这里插入图片描述

SpringAMQP提供了三个功能:

  • 自动声明队列、交换机及其绑定关系;
  • 基于注解的监听器模式,异步接收消息;
  • 封装了RabbitTemplate工具,用于发送消息 。

RabbitMQ工作模型:简单队列模型、工作队列模型、发布订阅模型(广播、路由、主题)。

在这里插入图片描述


2. BasicQueue 基本模型(简单模型)


使用SpringAMQP实现简单模型的消息收发:

在这里插入图片描述

1、在父工程中引入spring-amqp起步依赖:

<!--SpringAMQP:可以操作RabbitMQ-->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

完整的pom.xml配置:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>cn.aopmin</groupId><artifactId>rabbitmq02-BasicQueue</artifactId><version>1.0.0</version><packaging>pom</packaging><description>springAMQP实现简单模型消息传递</description><modules><module>publisher</module><module>consumer</module></modules><properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding></properties><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.2</version><relativePath/></parent><dependencies><!-- lombok --><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></dependency><!-- SpringAMQP:可以操作RabbitMQ --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><!-- 单元测试 --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId></dependency></dependencies><!-- 打包插件 --><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build>
</project>

2、消息发送

2.1、在publisher服务的application.yml中添加rabbitmq配置:

# RabbitMQ配置
spring:rabbitmq:host: 192.168.150.103 # 主机名port: 5672       # 端口virtual-host: /  # 虚拟主机username: admin  # 用户名password: 123456 # 密码# 日志配置
logging:pattern:dateformat: MM-dd HH:mm:ss:SSS

2.2、在publisher服务中编写测试类SpringAmqpTest,并利用RabbitTemplate实现消息发送:

package cn.aopmin.mq.test;import org.junit.jupiter.api.Test;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
import org.springframework.test.context.junit4.SpringRunner;/*** 使用SpringAMQP实现简单模型的消息发送** @author 白豆五* @version 2023/07/2* @since JDK8*/
@SpringBootTest
// @RequiredArgsConstructor // 生成构造方法(构造器注入,要求注入的字段必须final修饰)
public class SpringAmqpTest {/*** RabbitTemplate是SpringAMQP中的核心类,用于实现消息的发送和接收*/@Autowiredprivate  RabbitTemplate rabbitTemplate;/*** 测试简单模型的消息发送*/@Testpublic void testSimpleQueue() {// 队列名称String queueName = "simple.queue";// 消息String message = "hello, spring amqp!";// 发送消息rabbitTemplate.convertAndSend(queueName, message);}
}

在这里插入图片描述


3、消息接收

3.1、在consumer服务的application.yml中添加rabbitmq配置:

# RabbitMQ配置
spring:rabbitmq:host: 192.168.150.103 # 主机名port: 5672       # 端口virtual-host: /  # 虚拟主机username: admin  # 用户名password: 123456 # 密码# 日志配置
logging:pattern:dateformat: MM-dd HH:mm:ss:SSS

3.2、在consumer服务的com.baidou.mq.listener包中创建SpringRabbitListener类:

package cn.aopmin.mq.listener;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** 消息监听类** @author 白豆五* @version 2023/07/2* @since JDK8*/
@Component
public class SpringRabbitListener {/*** 订阅消息** @param msg 消息* @throws InterruptedException*/@RabbitListener(queues = "simple.queue")  // 配置要监听的队列: simple.queuepublic void listenSimpleQueueMessage(String msg) throws InterruptedException {System.out.println("消费者接收到消息:【" + msg + "】");}
}

4、测试

先启动consumer服务(启动类),然后再运行publisher服务中发送消息的测试代码。
在这里插入图片描述
在这里插入图片描述


3. WorkQueue 工作模型


工作队列模型(Work Queue Mode):消息按照一定的策略分配给多个消费者来解决消息堆积问题,适用于任务分发和负载均衡场景。

角色:生产者、队列、消费者

在这里插入图片描述

使用SpringAMQP实现工作队列模型的消息收发:

1、在消费者监听类中编写两个方法,监听同一个队列,模拟多个消费者。

package cn.aopmin.mq.listener;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** 消息监听类** @author 白豆五* @version 2023/07/2* @since JDK8*/
@Component
public class SpringRabbitListener {/*编写两个方法监听同一个队列,可以实现多个消费者同时消费一个队列的消息*/@RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage(String msg) throws InterruptedException {System.out.println("消费者1接收到消息:【" + msg + "】");}@RabbitListener(queues = "simple.queue")public void listenSimpleQueueMessage2(String msg) throws InterruptedException {System.out.println("消费者2接收到消息:【" + msg + "】");}
}

在这里插入图片描述

2、模拟生产者发多条消息:

/**
* 测试工作模型发消息
*/
@Test
public void testWork() {// 队列名称String queueName = "simple.queue";// 消息String message = "hello,rabbitmq";// 模拟发送100条消息for (int i = 1; i <= 100; i++) {rabbitTemplate.convertAndSend(queueName, message + i);}System.out.println("消息发送完毕!");
}

在这里插入图片描述

3、测试:先启动消费者服务,再执行生产者发送消息的代码。

在这里插入图片描述


消费者预取消息限制:

工作模型默认一人一半消息,可以通过修改消费者application.yml文件,配置prefetch属性,控制消费者预取消息的上限:

# RabbitMQ配置
spring:rabbitmq:host: 192.168.150.103 # 主机名port: 5672       # 端口virtual-host: /  # 虚拟主机username: admin  # 用户名password: 123456 # 密码listener:simple:prefetch: 1  # 消息预取策略(每次获取一条消息,处理完后再获取下一条)

prefetch属性用于指定消费者一次从RabbitMQ服务器预取的消息数量。通过限制预取消息的数量,你可以控制每个消费者同时处理的消息数量,从而实现负载均衡和资源控制。


4. Publish、Subscribe 发布订阅模型


发布订阅模型特点: 可以通过交换机(exchange)将一条消息发给多个队列(消费者)进行处理。

常见的exchange类型包括:Fanout广播、Direct路由、Topic主题。

交换机的主要作用:

  • 接收生产者发送的消息
  • 将消息按照规则路由到绑定过的队列中
  • 它不能缓存消息,路由失败,消息丢失
    在这里插入图片描述

SpringAMQP提供了一个Exchange接口,来表示所有不同类型的交换机:

在这里插入图片描述


4.1 Fanout 广播模型


Fanout Exchange,交换机会把收到的消息发送给绑定过的所有队列。(队列需要与交换机建立关系,然后才能收到对应消息)

在这里插入图片描述


接下来使用SpringAMQP演示Fanout Exchange收发消息:

1、在consumer服务中,利用代码声明队列、交换机,并将两者绑定

package cn.aopmin.mq.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 在消费端声明队列、交换机、绑定关系,这样就不用在rabbitmq管理页面手动创建了,这样在服务启动后springAMQP会自动创建** @author 白豆五* @version 2023/07/2* @since JDK8*/
@Configuration
public class MqConfig {/*** 声明Fanout交换机* 交换机名: exchange.fanout*/@Beanpublic FanoutExchange fanoutExchange() {return new FanoutExchange("exchange.fanout");}/*** 声明队列* 队列名: fanout.queue1*/@Beanpublic Queue queue1() {return new Queue("fanout.queue1");}/*** 声明队列* 队列名: fanout.queue2*/@Beanpublic Queue queue2() {return new Queue("fanout.queue2");}/*** 绑定关系* 将队列1绑定到Fanout交换机上*/@Beanpublic Binding binding1(FanoutExchange fanoutExchange, Queue queue1) {return BindingBuilder.bind(queue1).to(fanoutExchange);}/*** 绑定关系* 将队列2绑定到Fanout交换机上*/@Beanpublic Binding binding2(FanoutExchange fanoutExchange, Queue queue2) {//参数注入,即参数名就是bean的名字return BindingBuilder.bind(queue2).to(fanoutExchange);}
}

消费者application.yml配置:

# RabbitMQ配置
spring:rabbitmq:host: 192.168.150.103 # 主机名port: 5672       # 端口virtual-host: /  # 虚拟主机username: admin  # 用户名password: 123456 # 密码# 日志配置
logging:pattern:dateformat: MM-dd HH:mm:ss:SSS

2、在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2

package cn.aopmin.mq.listener;import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;/*** 消息监听类** @author 白豆五* @version 2023/07/2* @since JDK8*/
@Component
public class FanoutListener {/*编写两个方法,分别监听队列1和队列2*/@RabbitListener(queues = "fanout.queue1")public void listenerFanoutQueue1(String msg) throws InterruptedException {System.out.println("消费者1接收到消息:【" + msg + "】");}@RabbitListener(queues = "fanout.queue2")public void listenerFanoutQueue2(String msg) throws InterruptedException {System.out.println("消费者2接收到消息:【" + msg + "】");}
}

编写监听类后,启动消费者服务会自动创建交换机和队列组件:

在这里插入图片描述

3、在publisher的测试类中编写向exchange.fanout发消息的代码:

/*** 测试广播模式发送消息*/
@Test
public void testFanout() {// 交换机名称String exchangeName = "exchange.fanout";// 消息String message = "hello,rabbitmq";// 发送消息// 第一个参数是交换机名称// 第二个参数是routingKey(路由key),在广播模式下不需要指定// 第三个参数是消息rabbitTemplate.convertAndSend(exchangeName, "", message);System.out.println("消息发送完毕!");
}

执行测试方法,查看运行结果:

在这里插入图片描述


4.2 Direct 路由模型


Direct exchange,会将接收到的消息按照规则(Routing key)转发到指定的队列,因此称为路由模式(routes)

在交换机上做了一层规则判断操作。

Fanou模型要求:

  • 每一个Queue都与Exchange设置一个BindingKey;
  • 发布者发送消息时,指定消息的RoutingKey;
  • Exchange会将消息路由到BindingKey与消息RoutingKey一致的队列上。

在这里插入图片描述

基于AMQP演示Direct模型::

1、在consumer服务的监听类中,编写两个消费者方法,并在方法上通过@RabbitListener组合注解声明Exchange、Queue、RoutingKey,然后分别监听direct.queue1和direct.queue2队列中的消息:

package cn.aopmin.mq.listener;import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import javax.xml.ws.BindingType;/*** 消息监听类 (通过注解的方式声明交换机和队列、及绑定关系)** @author 白豆五* @version 2023/07/2* @since JDK8*/
@Component
public class DirectListener {/*** 在监听方法上通过注解的方式声明交换机和队列、及绑定关系* 队列: 通过@Queue注解创建队列* 交换机: 通过@Exchange注解创建交换机* 绑定关系: 通过bindingkey绑定{"blue", "red"}** @param msg* @throws InterruptedException*/@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),//创建队列exchange = @Exchange(name = "exchange.direct", type = ExchangeTypes.DIRECT),//创建direct交换机key = {"blue", "red"} // bindingkey))public void listenDirect1(String msg) throws InterruptedException {System.out.println("消费者1接收到消息:【" + msg + "】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),//创建队列exchange = @Exchange(name = "exchange.direct", type = ExchangeTypes.DIRECT),//创建direct交换机key = {"yellow", "red"} // bindingkey))public void listenDirect2(String msg) throws InterruptedException {System.out.println("消费者2接收到消息:【" + msg + "】");}
}

在这里插入图片描述


2、在publisher中编写测试方法,向exchange.direct发送消息

/**
* 测试路由模式发送消息
*/
@Test
public void testDirect() {// 交换机名称String exchangeName = "exchange.direct";// 消息String message = "helloworld!";// 发送消息// 第一个参数是交换机名称// 第二个参数是routingKey(路由key,发消息时候用的),在路由模式下需要指定// 第三个参数是消息rabbitTemplate.convertAndSend(exchangeName, "blue", "routingKey:blue ---" + message);rabbitTemplate.convertAndSend(exchangeName, "red", "routingKey:red ---" + message);System.out.println("消息发送完毕!");
}

在这里插入图片描述


3、测试:启动消费者服务创建交换机和队列,然后执行生产者发消息方法

在这里插入图片描述
在这里插入图片描述


小节:

1、Direct与Fanout交换机的区别?

  • Fanout相对于Direct更灵活些。

  • Fanout交换机不做判断,收到消息就会广播给绑定的队列。

  • Direct交换机会根据RouthingKey判断,然后路由给满足规则的队列。

  • 在Direct模型中,如果多个队列都有相同的RouthingKey,则与Fanout功能类似。

2、基于@RabbitListeneri注解声明队列和交换机有哪些常见注解?

  • @QueueBinding 绑定关系
  • @Queue 队列
  • @Exchange 交换机

4.3 Topic 主题模型


Topic Exchange 与 Direct Exchange类似,区别在与RoutingKey必须是多个单词组成,并且以==.== 分割。(用的最多)

队列与交换机指定BindingKey时可以使用通配符:

  • #:表示匹配0或多个单词;例如 china.# 、#.new
  • *:表示匹配1个单词;

在这里插入图片描述

  • Queue1:绑定的是java.# ,因此凡是以 java.开头的routing key 都会被匹配到,例如 java.news、java.blog;
  • Queue2:绑定的是#.news ,因此凡是以 .news结尾的 routing key 都会被匹配到,例如java.news、weather.news、heihe.weather.news。

基于AMQP演示Topic 模型:

1、在消费端的监听方法上声明交换机和队列、及绑定关系

package cn.aopmin.mq.listener;import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;import javax.xml.ws.BindingType;/*** 消息监听类 (通过注解的方式声明交换机和队列、及绑定关系)** @author 白豆五* @version 2023/07/2* @since JDK8*/
@Component
public class TopicListener {@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),//创建队列exchange = @Exchange(name = "exchange.topic", type = ExchangeTypes.TOPIC),//创建direct交换机key = "java.#" // bindingkey))public void listenTopic1(String msg) throws InterruptedException {System.out.println("消费者1接收到消息:【" + msg + "】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),//创建队列exchange = @Exchange(name = "exchange.topic", type = ExchangeTypes.TOPIC),//创建direct交换机key = "#.news" // bindingkey))public void listenTopic2(String msg) throws InterruptedException {System.out.println("消费者2接收到消息:【" + msg + "】");}
}

2、在生产端,编写发消息方法

/*** 测试主题模式发送消息*/
@Test
public void testTopic() {// 交换机名称String exchangeName = "exchange.topic";// 消息String message = "helloworld!";// 发送消息// 第一个参数是交换机名称// 第二个参数是routingKey(路由key),在路由模式下需要指定// 第三个参数是消息rabbitTemplate.convertAndSend(exchangeName, "java.blog", message);rabbitTemplate.convertAndSend(exchangeName, "java.news", message);System.out.println("消息发送完毕!");
}

3、测试

在这里插入图片描述


小节

1、Direct和Topic交换机的区别?

  • 相同点:两个交换机都会key进行判断。(即消息的路由key与队列的绑定key进行比较)
  • 不同点:Topic队列的绑定key支持通配符更加灵活。Direct队列的绑定key不支持通配符,只能匹配具体key的消息。

5. 消息转换器


默认情况下,Spring会帮我们把发送的任意对象类型消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。
在这里插入图片描述

但是,Spring默认使用的是JDK序列化,JDK序列化会存在一些问题:

  • 数据体积过大;
  • 有安全隐患;
  • 可读性差。

5.1 使用默认消息转换器发送Object类型消息


1、声明队列、编写监听方法

package cn.aopmin.mq.config;import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;/*** 声明队列** @author 白豆五* @version 2023/07/3* @since JDK8*/
@Configuration
public class MqConfig {@Beanpublic Queue ObjectQueue() {return new Queue("object.queue");}
}
@Component
public class ObjectListener {@RabbitListener(queues = "object.queue")public void listenObj(Object obj) {System.out.println("收到消息:" + obj.toString());}
}

2、编写发消息代码

/*** 测试默认消息转换器收发消息(JDK序列化)*/
@Test
public void testDefault() {// 队列名称String queueName = "object.queue";// 对象消息Map<String, Object> message = new HashMap<>();message.put("name", "张三");message.put("age", 18);// 发送消息rabbitTemplate.convertAndSend(queueName, message);System.out.println("消息发送完毕!");
}

测试,查看队列数据:(默认情况下JDK序列化的结果不直观,可以把消息转成json格式发送)

在这里插入图片描述


5.2 使用Jackson消息转换器收发JSON消息


Spring提供了org.springframework.amqp.support.converter.MessageConverter接口来处理对象消息的转换。在AMQP中默认实现是SimpleMessageConverter,而SimpleMessageConverter它基于JDK的ObjectOutputStream完成序列化。

如果我们不想使用默认的消息转换器,只需在生产端和消费端配置MessageConverter类型的Bean即可。

1、生产端和消费端都引入jackson依赖:

<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version>
</dependency>

2、生产端和消费的都配置消息转换器

package cn.aopmin.mq;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;//生产端
@SpringBootApplication
public class PublisherApp {public static void main(String[] args) {SpringApplication.run(PublisherApp.class, args);}// 使用json序列化机制,进行消息转换@Beanpublic MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter();}
}
package cn.aopmin.mq;import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;//消费端
@SpringBootApplication
public class ConsumerApp {public static void main(String[] args) {SpringApplication.run(ConsumerApp.class, args);}// 使用json序列化机制,进行消息转换@Beanpublic MessageConverter jsonMessageConverter(){return new Jackson2JsonMessageConverter();}
}

3、修改消费端的监听方法

@Component
public class ObjectListener {@RabbitListener(queues = "object.queue")public void listenObj(Map<String,Object> msg) {System.out.println("收到消息:" + msg);}
}

4、测试

在这里插入图片描述


5.3 使用默认消息转换器收发JSON消息


上一种方案,配来配去非常麻烦,而且一旦消息转换器不一样,就不能达到想要的结果。默认情况下,对于字符串类型的消息,默认的JDK消息转换器会使用UTF-8编码将字符串转换为字节数组,并将其作为消息体进行发送。

这样我们在发消息的时候手工将对象序列化为json字符串,在接收消息时再序列化为Java对象即可。

JSON工具类:

<dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.76</version>
</dependency>

常用方法:

  • 序列化:JSON.toJSONString(xxx);
  • 反序列化:JSONObject(str,Xxx.class);

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/7826.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

FastDFS【SpringBoot操作FastDFS、复习】(三)-全面详解(学习总结---从入门到深化)

目录 SpringBoot操作FastDFS 复习&#xff1a; SpringBoot操作FastDFS 由GitHub大牛tobato在原作者YuQing与yuqih发布的JAVA客户端基 础上进行了大量重构工作&#xff0c;并于GitHub上发布了FastDFS-Client1.26.5。 主要特性 1 对关键部分代码加入了单元测试&#xff0c;便于…

使用家庭宽带和摄像头,实现公网直播

那天去逛商场看到有个营业厅&#xff0c;本想进去问问有没有存话费送话费的活动&#xff0c;结果被忽悠办了一个19.9升千兆宽带加送一个路由器的业务。 网络环境验证 听他们说现在家庭宽带都是有公网IPV6地址的&#xff0c;立马用电脑试了下确实有IPV6地址。 赶紧随便写了几行…

C++ 数据结构图(1)

1. 图的基本概念 图是由顶点集合及顶点间的关系组成的一种数据结构&#xff1a;G (V&#xff0c; E) &#xff0c;其中&#xff1a; 顶点集合 V {x|x 属于某个数据对象集 } 是有穷非空集合 &#xff1b; E {(x,y)|x,y 属于 V} 或者 E {<x, y>|x,y 属于 V &&…

那些无法避免的弯路

近日&#xff0c;某高校毕业生在校期间窃取学校内网数据&#xff0c;收集全校学生个人隐私信息的新闻引发了人们对互联网生活中个人信息安全问题的再度关注。在大数据时代&#xff0c;算法分发带来了隐私侵犯&#xff0c;在享受消费生活等便捷权利的同时&#xff0c;似乎又有不…

Lion:闭源大语言模型的对抗蒸馏

Lion&#xff1a;闭源大语言模型的对抗蒸馏 Lion&#xff0c;由香港科技大学提出的针对闭源大语言模型的对抗蒸馏框架&#xff0c;成功将 ChatGPT 的知识转移到了参数量 7B的 LLaMA 模型&#xff08;命名为 Lion&#xff09;&#xff0c;在只有 70k训练数据的情况下&#xff0…

84、基于stm32单片机超市自助存储柜快递箱系统设计(程序+原理图+流程图+参考论文+开题报告+任务书+设计资料+元器件清单等)

单片机主芯片选择方案 方案一&#xff1a;AT89C51是美国ATMEL公司生产的低电压&#xff0c;高性能CMOS型8位单片机&#xff0c;器件采用ATMEL公司的高密度、非易失性存储技术生产&#xff0c;兼容标准MCS-51指令系统&#xff0c;片内置通用8位中央处理器(CPU)和Flash存储单元&a…

select 框添加树结构(todu)

1. 案例: 2. 代码 下班了&#xff0c;明天写

短视频seo矩阵+抖音小程序源码开发解决方案(一)

该解决方案主要针对产品用户交易决策周期长/非标定制等情况的企业&#xff0c;如&#xff1a;房产、汽车、金融、咨询服务&#xff0c;广告设计、网络科技公司&#xff0c;TOB类销售行业等。 基于不同的经营场景&#xff0c;解决方案全面更新&#xff0c;新增账号管理&#xf…

【嵌入式Qt开发入门】如何使用Qt进行文本读写——QFile读写文本

在很多时候我们需要读写文本文件进行读写&#xff0c;比如写个 Mp3 音乐播放器需要读 Mp3 歌词里的文本&#xff0c;比如修改了一个 txt 文件后保存&#xff0c;就需要对这个文件进行读写操作。本文介绍简单的文本文件读写&#xff0c;内容精简&#xff0c;让大家了解文本读写的…

JavaWeb 速通HTML(常用标签汇总及演示)

目录 一、拾枝杂谈 1.网页组成 : 1 结构 2 表现 3 行为 2.HTML入门 : 1 基本介绍 2.基本结构 : 3.HTML标签 : 1 基本说明 2 注意事项 二、常用标签汇总及演示 1.font标签 : 1 定义 2 演示 2.字符实体 : 1 定义 2 演示 3.标题标签 : 1 定义 2 演示 4. 超链接标签 : 1…

2023年07月在线IDE流行度最新排名

点击查看最新在线IDE流行度最新排名&#xff08;每月更新&#xff09; 2023年07月在线IDE流行度最新排名 TOP 在线IDE排名是通过分析在线ide名称在谷歌上被搜索的频率而创建的 在线IDE被搜索的次数越多&#xff0c;人们就会认为它越受欢迎。原始数据来自谷歌Trends 如果您相…

【CSS】定位

&#x1f4dd;个人主页&#xff1a;爱吃炫迈 &#x1f48c;系列专栏&#xff1a;HTMLCSS &#x1f9d1;‍&#x1f4bb;座右铭&#xff1a;道阻且长&#xff0c;行则将至&#x1f497; 文章目录 标准流&#xff08;Normal Flow&#xff09;元素定位position属性静态定位-static…