微服务技术栈之rabbitMQ基础入门(一)

准备工作:

1,创建空的工程:

首先我们先创建一个空的工程,并且命名为 mq-java

2,创建一个生产者springboot工程(plblisher):
设置项目的基本信息:

勾选版本和依赖:
看看项目的结构:
删除一些占时不需要的文件:

精简后的结构:
 

添加依赖:

        <!--mybatis-plus依赖--><dependency><groupId>com.baomidou</groupId><artifactId>mybatis-plus-boot-starter</artifactId><version>3.5.5</version></dependency><!-- mysql依赖--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.28</version></dependency><!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>
 把application.properties配置文件改成application.yml格式:

 跟据自己实际情况修改:

代码:

server:port: 9091spring:datasource:driver-class-name: com.mysql.cj.jdbc.Driverurl: jdbc:mysql://localhost:3306/mp?useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=Asia/Shanghaiusername: rootpassword: rootmybatis-plus:configuration:log-impl: org.apache.ibatis.logging.stdout.StdOutImplmap-underscore-to-camel-case: true

 测试能否正常启动项目:

3,创建一个消费者springboot工程(consumer)

步骤跟上一步创建生产者一样:

正常运行就可以了

一,初始MQ:

目比较常见的MQ实现:

  • ActiveMQ

  • RabbitMQ

  • RocketMQ

  • Kafka

几种常见MQ的对比:

RabbitMQActiveMQRocketMQKafka
公司/社区RabbitApache阿里Apache
开发语言ErlangJavaJavaScala&Java
协议支持AMQP,XMPP,SMTP,STOMPOpenWire,STOMP,REST,XMPP,AMQP自定义协议自定义协议
可用性一般
单机吞吐量一般;几十万每秒差;10w/s高;100w/s非常高;100w/s
消息延迟微秒级毫秒级毫秒级毫秒以内
消息可靠性一般一般

追求可用性:Kafka、 RocketMQ 、RabbitMQ

追求可靠性:RabbitMQ、RocketMQ

追求吞吐能力:RocketMQ、Kafka

追求消息低延迟:RabbitMQ、Kafka

二,RabbitMQ

安装MQ:

基于Docker来安装RabbitMQ,使用下面的命令即可:

docker run \-e RABBITMQ_DEFAULT_USER=sde \-e RABBITMQ_DEFAULT_PASS=123 \-v mq-plugins:/plugins \--name mq \--hostname mq \-p 15672:15672 \-p 5672:5672 \--network hm-net\-d \rabbitmq:3.8-management

实际操作:

[root@bogon ~]# docker run \
>  -e RABBITMQ_DEFAULT_USER=sde \
>  -e RABBITMQ_DEFAULT_PASS=123 \
>  -v mq-plugins:/plugins \
>  --name mq \
>  --hostname mq \
>  -p 15672:15672 \
>  -p 5672:5672 \
>  --network hm-net\
>  -d \
>  rabbitmq:3.8-management
922d9368df6bbfb059e806fc72be7bfbc303362f28a7a15a8770a212d95102fb
[root@bogon ~]#

要是下载的慢,我们可以用自己准备好的mq.tar包,然后使用 docker load -i mq.tar加载

可以看到在安装命令中有两个映射的端口:

  • 15672:RabbitMQ提供的管理控制台的端口

  • 5672:RabbitMQ的消息发送处理接口

安装完成后,我们访问 http://192.168.200.128:15672即可看到管理控制台。首次访问需要登录,默认的用户名和密码在安装命令中已经指定了(sde/123)。

http://自己的ip地址+mq端口号

登录后即可看到管理控制台总览页面:

登录成功之后的页面:

RabbitMQ对应的架构如图:

  • publisher:生产者,也就是发送消息的一方

  • consumer:消费者,也就是消费消息的一方

  • queue:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理

  • exchange:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。

  • virtual host:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue

收发消息
交换机:

我们打开Exchanges选项卡,可以看到已经存在很多交换机:

点击任意的一个交换机进入详情页面,进行测试。仍然可利用控制台中的publish message 发送一条消息:

输入消息内容,发送消息:

 这里是由控制台模拟了生产者发送的消息。由于没有路由到队列存储,最终消息丢失了,这样说明交换机没有存储消息的能力。

 

 队列:

我们打开Queues选项卡,新建一个队列:

我们新建两个队列,hello.queue1和hello.queue2这两个

在新建一个队列查看:

 

此时,我们再次向amq.fanout交换机发送一条消息。会发现消息依然没有到达队列!!

怎么回事呢?

发送到交换机的消息,只会路由到与其绑定的队列,因此仅仅创建队列是不够的,我们还需要将其与交换机绑定

绑定关系:

点击Exchanges选项卡,点击amq.fanout交换机,进入交换机详情页,然后点击Bindings菜单,在表单中填写要绑定的队列名称:

输入队列的名字然后点击bind:

在用相同的方式,绑定hello.queue2队列:

看效果:

发送消息:

再次回到exchange页面,找到刚刚绑定的amq.fanout,点击进入详情页,再次发送一条消息:

消息发送成功了

可以看到跟这个交换机绑定的队列,已经接收到了消息:

点击其中一个队列,查看消息:

看到了来自交换机发送的消息:

这个时候如果有消费者监听了MQ的hello.queue1hello.queue2队列,自然就能接收到消息了。

数据隔离
用户管理:

点击Admin选项卡,首先会看到RabbitMQ控制台的用户管理界面:

里的用户都是RabbitMQ的管理或运维人员。目前只有安装RabbitMQ时添加的sde这个用户。仔细观察用户表格中的字段,如下:

  • Namesde,也就是用户名

  • Tagsadministrator,说明sde用户是超级管理员,拥有所有权限

  • Can access virtual host/,可以访问的virtual host,这里的/是默认的virtual host

对于小型企业而言,出于成本考虑,我们通常只会搭建一套MQ集群,公司内的多个不同项目同时使用。这个时候为了避免互相干扰, 我们会利用virtual host的隔离特性,将不同项目隔离。一般会做两件事情:

  • 给每个项目创建独立的运维账号,将管理权限分离。

  • 给每个项目创建不同的virtual host,将每个项目的数据隔离。

比如,我们给商城创建一个新的用户,命名为lisi,密码123:

输入用户名和密码:

你会发现此时hmall用户没有任何Virtual host的访问权限:

别急,接下来我们就来授权。

Virtual host

我们先退出登录:

用我们刚刚创建好的用户登录

切换到刚刚创建的lisi用户登录,密码为123,然后点击Virtual Hosts菜单,进入virtual host管理页:

可以看到目前只有一个默认的virtual host,名字为 /

我们可以给商城项目创建一个单独的virtual host,而不是使用默认的/;新建起名为 /hmall

创建好之后看看效果:

 

由于我们是登录lisi账户后创建的virtual host,因此回到users菜单,你会发现当前用户已经具备了对/lisi这个virtual host的访问权限了:

此时,点击页面右上角的virtual host下拉菜单,切换virtual host/hmall

然后再次查看queues选项卡,会发现之前的队列已经看不到了:

这就是基于virtual host的隔离效果。

三,SpringAMQP

将来我们开发业务功能的时候,肯定不会在控制台收发消息,而是应该基于编程的方式。由于RabbitMQ采用了AMQP协议,因此它具备跨语言的特性。任何语言只要遵循AMQP协议收发消息,都可以与RabbitMQ交互。并且RabbitMQ官方也提供了各种不同语言的客户端。

但是,RabbitMQ官方提供的Java客户端编码相对复杂,一般生产环境下我们更多会结合Spring来使用。而Spring的官方刚好基于RabbitMQ提供了这样一套消息收发的模板工具:SpringAMQP。并且还基于SpringBoot对其实现了自动装配,使用起来非常方便。

SpringAmqp的官方地址:

Spring AMQP

SpringAMQP提供了三个功能:

  • 自动声明队列、交换机及其绑定关系

  • 基于注解的监听器模式,异步接收消息

  • 封装了RabbitTemplate工具,用于发送消息

快速入门

在入门案例中,我们就演示这样的简单模型,如图:

也就是:

  • publisher直接发送消息到队列

  • 消费者监听并处理队列中的消息

这种模式一般也就是用于简单的测试,实际生产中很少使用。

为了方便测试,我们使用lisi/123这个账号登录,然后使用控制台。新建一个simple.queue队列

看看添加队列之后:

消息发送:

首先配置MQ地址,在 publisher\src\main\resources\application.yml 中添加配置:

spring:
  rabbitmq:
    host: 192.168.200.128 # 你的虚拟机IP
    port: 5672 # 端口
    virtual-host: /hmall # 虚拟主机
    username: lisi # 用户名
    password: 123 # 密码

代码:

  rabbitmq:host: 192.168.200.128 # 你的虚拟机IPport: 5672 # 端口virtual-host: /hmall # 虚拟主机username: lisi # 用户名password: 123 # 密码

 编写测试类,进行测试:

我创建了一个名为SimpleTest的测试类:

代码:

@Slf4j
@SpringBootTest
public class SimpleTest {}

 补充内容:

代码:

    @Test@DisplayName("简单的队列测试")public void testSimpleQueue(){//1,队列名称String queueName = "simple.queue";//2,消息String msg = "hello world";//3,发送消息rabbitTemplate.convertAndSend(queueName,msg);}

打开rabbitMQ的控制台查看效果:

已经接收到一条消息了

消息接收:

我们在mq的consumer的yml文件里面,添加配置

代码:

  rabbitmq:host: 192.168.200.131 # 你的虚拟机IPport: 5672 # 端口virtual-host: /hmall # 虚拟主机username: hmall # 用户名password: 123 # 密码

 在com.sde 包下,新建一个listener子包,然后编写一个监听类

代码:

@Slf4j
@Component
public class MqListener {/*** 监听simple.queue队列的消息* @param msg*/@RabbitListener(queues = "simple.queue")public void listenerSimpleQueue(String msg) {System.out.println("接收到的消息是:"+msg);}
}

测试:

 启动consumer的项目:

看看rabbitMQ的控制台:

可以看到消息已经被消费掉了

WorkQueue模型

Work queues,任务模型。简单来说就是让多个消费者绑定到一个队列,共同消费队列中的消息

当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。

此时就可以使用work 模型,多个消费者共同处理消息处理,消息处理的速度就能大大提高了。

下面我就来模拟这个场景:

首先我们在控制台创建一个队列,命名为work.queue

发送消息:

这次我们循环发送,模拟大量消息堆积现象。

在publisher服务中的Test类中添加一个测试方法:

代码:

    /*** 向队列发送大量消息,模拟消息堆积。* @throws InterruptedException*/@Test@DisplayName("work队列")public void testWorkQueue() throws InterruptedException {//1,队列名称String queueName = "work.queue";//2,消息String msg = "hello world";//3,发送消息for (int i = 0; i < 50; i++) {rabbitTemplate.convertAndSend(queueName,msg);Thread.sleep(30);}}

 可以看到我们的队列中已经有了50条消息:

消息接收:

要模拟多个消费者绑定同一个队列,我们在consumer服务的SpringRabbitListener中添加2个新的方法:

代码:

    /***一个方法消费后沉睡 20毫秒* @param msg* @throws InterruptedException*/@RabbitListener(queues = "work.queue")public void listenerWorkQueue1(String msg) throws InterruptedException {System.out.println("---------消费者一接收到的消息是:"+msg);Thread.sleep(20);}/*** 一个方法消费后沉睡 200毫秒* @param msg* @throws InterruptedException*/@RabbitListener(queues = "work.queue")public void listenerWorkQueue2(String msg) throws InterruptedException {System.out.println("消费者二接收到的消息是:"+msg);Thread.sleep(200);}

注意到这两消费者,都设置了Thead.sleep,模拟任务耗时:

  • 消费者1 sleep了20毫秒,相当于每秒钟处理50个消息

  • 消费者2 sleep了200毫秒,相当于每秒处理5个消息

测试:

看看控制台:

50条消息已经消费完了

最终消费结果如下:

---------消费者一接收到的消息是:hello world
消费者二接收到的消息是:hello world
---------消费者一接收到的消息是:hello world
---------消费者一接收到的消息是:hello world
---------消费者一接收到的消息是:hello world
---------消费者一接收到的消息是:hello world
---------消费者一接收到的消息是:hello world
消费者二接收到的消息是:hello world
---------消费者一接收到的消息是:hello world
---------消费者一接收到的消息是:hello world
---------消费者一接收到的消息是:hello world
---------消费者一接收到的消息是:hello world
消费者二接收到的消息是:hello world
---------消费者一接收到的消息是:hello world
---------消费者一接收到的消息是:hello world
---------消费者一接收到的消息是:hello world
---------消费者一接收到的消息是:hello world
---------消费者一接收到的消息是:hello world
消费者二接收到的消息是:hello world
---------消费者一接收到的消息是:hello world
---------消费者一接收到的消息是:hello world
---------消费者一接收到的消息是:hello world
---------消费者一接收到的消息是:hello world
---------消费者一接收到的消息是:hello world
消费者二接收到的消息是:hello world
---------消费者一接收到的消息是:hello world
---------消费者一接收到的消息是:hello world
---------消费者一接收到的消息是:hello world
---------消费者一接收到的消息是:hello world
消费者二接收到的消息是:hello world
---------消费者一接收到的消息是:hello world
消费者二接收到的消息是:hello world
消费者二接收到的消息是:hello world
消费者二接收到的消息是:hello world
消费者二接收到的消息是:hello world
消费者二接收到的消息是:hello world
消费者二接收到的消息是:hello world
消费者二接收到的消息是:hello world
消费者二接收到的消息是:hello world
消费者二接收到的消息是:hello world
消费者二接收到的消息是:hello world
消费者二接收到的消息是:hello world
消费者二接收到的消息是:hello world
消费者二接收到的消息是:hello world
消费者二接收到的消息是:hello world
消费者二接收到的消息是:hello world
消费者二接收到的消息是:hello world
消费者二接收到的消息是:hello world
消费者二接收到的消息是:hello world
消费者二接收到的消息是:hello world

可以看到消费者1和消费者2竟然每人消费了25条消息:

  • 消费者1很快完成了自己的25条消息

  • 消费者2却在缓慢的处理自己的25条消息。

也就是说消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。导致1个消费者空闲,另一个消费者忙的不可开交。没有充分利用每一个消费者的能力,最终消息处理的耗时远远超过了1秒。这样显然是有问题的。

能者多劳:

我们可以在消费者的yml文件中配置以下信息

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

代码:

  rabbitmq:host: 192.168.200.131 # ?????IPport: 5672 # ??virtual-host: /hmall # ????username: hmall # ???password: 123 # ??listener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

 再次测试,发现结果如下

我在往队列里面,发送50条消息

启动消费者,再次看结果:

可以发现,队列1比队列2消费的消息多。

正所谓能者多劳,这样充分利用了每一个消费者的处理能力,可以有效避免消息积压问题。

总结:

Work模型的使用:

  • 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理

  • 通过设置prefetch来控制消费者预取的消息数量

交换机类型

在之前的两个测试案例中,都没有交换机Exchange,生产者直接发送消息到队列。而一旦引入交换机,消息发送的模式会有很大变化:

可以看到,在订阅模型中,多了一个exchange角色,而且过程略有变化

  • Publisher:生产者,不再发送消息到队列中,而是发给交换机

  • Exchange:交换机,一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。

  • Queue:消息队列也与以前一样,接收消息、缓存消息。不过队列一定要与交换机绑定。

  • Consumer:消费者,与以前一样,订阅队列,没有变化

Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

交换机的类型有四种:
  • Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机

  • Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列

  • Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符

  • Headers:头匹配,基于MQ的消息头匹配,用的较少

在这里我们只看前三个就行了

Fanout交换机

Fanout,英文翻译是扇出,我觉得在MQ中叫广播更合适。

在广播模式下,消息发送流程是这样的:

  • 1) 可以有多个队列

  • 2) 每个队列都要绑定到Exchange(交换机)

  • 3) 生产者发送的消息,只能发送到交换机

  • 4) 交换机把消息发送给绑定过的所有队列

  • 5) 订阅队列的消费者都能拿到消息

我们的计划是这样的

创建一个名为hmall.fanout的交换机,类型是fanout

创建两个队列,fanout.queue1和fanout.queue2,绑定到hmall.fanout交换机上

声明交换机和队列:

声明一个交换机

声明两个队列:

实现交换机和队列的绑定:

发送消息:

在测试类里面测试进行发消息:

代码:

@Slf4j
@SpringBootTest
public class ExchangeTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Test@DisplayName("测试fanout交换机发送消息")void testSendFanout() throws Exception {//1,交换机名称String exchangeName = "hmall.fanout";//2,发送的消息String msg = "hello every";//3,发送消息rabbitTemplate.convertAndSend(exchangeName, "", msg);}}

 注意:上述的 convertAndSend 方法的第2个参数:路由key 因为没有绑定,所以可以指定为空

可以看到跟hmall.fanout交换机绑定的两个队列,都接收到了消息。

消息接收:

在消费者中新增两个方法,接收队列的消息:

代码:

@Slf4j
@Component
public class ExchangeConfig {/*** 接收fanout.queue1的消息* @param msg*/@RabbitListener(queues = "fanout.queue1")public void receive1(String msg) {System.out.println("接收到fanout.queue1的消息是: " + msg);}/*** 接收fanout.queue2的消息*/@RabbitListener(queues = "fanout.queue2")public void receive2(String msg) {System.out.println("接收到fanout.queue2的消息是: " + msg);}}

 运行consumer消费者:

总结:

交换机的作用是什么?

  • 接收publisher发送的消息

  • 将消息按照规则路由到与之绑定的队列

  • 不能缓存消息,路由失败,消息丢失

  • FanoutExchange的会将消息路由到每个绑定的队列

Direct交换机

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

在Direct模式下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个 RoutingKey(路由key)

  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey

  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

案例需求图:

  1. 声明一个名为hmall.direct的交换机

  2. 声明队列direct.queue1,绑定hmall.directbindingKeybludred

  3. 声明队列direct.queue2,绑定hmall.directbindingKeyyellowred

  4. consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2

  5. 在publisher中编写测试方法,向hmall.direct发送消息

声明交换机和队列:

在控制台创建两个队列,direct.queue1和dicect.queue2

声明一个direct类型的交换机:命名为hmall.direct

使用blue和red作为key,绑定direct.queue1到hmall.direct交换机:

使用yellow和red作为key,绑定direct.queue2到hmall.direct交换机:

消息发送:

编写一个测试类,发送消息:

代码:

@Slf4j
@SpringBootTest
public class DirectTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Test@DisplayName("像key为red的队列发送消息")void testSendByRed(){//1,交换机String exchangeName = "hmall.direct";//2,消息内容String msg = "hello red key";//3,发送消息//第一个参数交换机,第二个参数是routingKey,第三个参数是消息rabbitTemplate.convertAndSend(exchangeName,"red",msg);}
}

结果:

 因为routingKey为red两个队列都包含了,所以他们都收到了消息。

测试routingKey是blue的

代码:

    @Test@DisplayName("测试key为blue的队列")void testSendByBlue(){//1,交换机String exchangeName = "hmall.direct";//2,消息内容String msg = "hello blue key";//3,发送消息//第一个参数交换机,第二个参数是routingKey,第三个参数是消息rabbitTemplate.convertAndSend(exchangeName,"blue",msg);}

结果:

可以看到direct.queue1的key有blue,所以它接收到了消息。

测试routingKey包含yellow的队列:

 

代码:

    @Test@DisplayName("测试key为yellow的队列")void testSendByYellow(){//1,交换机String exchangeName = "hmall.direct";//2,消息内容String msg = "hello yellow key";//3,发送消息//第一个参数交换机,第二个参数是routingKey,第三个参数是消息rabbitTemplate.convertAndSend(exchangeName,"yellow",msg);}

结果:

 

消息接收:

在cousumer工程里面,新建一个类,写两个方法测试接收,这两个队列的消息:

代码:

@Slf4j
@Component
public class DirectListener {/*** 接收direct.queue1队列的消息* @param msg*/@RabbitListener(queues = "direct.queue1")public void receive1(String msg) {System.out.println("接收到direct.queue1队列的消息:"+msg);}/*** 接收direct.queue2队列的消息*/@RabbitListener(queues = "direct.queue2")public void receive2(String msg) {System.out.println("接收到direct.queue2队列的消息:"+msg);}}

运行consumer项目进行测试:

 可以看到消息都接收到了

 总结:

描述下Direct交换机与Fanout交换机的差异?

  • Fanout交换机将消息路由给每一个与之绑定的队列

  • Direct交换机根据RoutingKey判断路由给哪个队列

  • 如果多个队列具有相同的RoutingKey,则与Fanout功能类似

Topic交换机

Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。

只不过Topic类型Exchange可以让队列在绑定RoutingKey 的时候使用通配符!

RoutingKey 一般都是有一个或多个单词组成,多个单词之间以.分割,例如: goods.insert

通配符规则:
  • #:匹配一个或多个词

  • *:匹配不多不少恰好1个词

举例:

user.# :可以匹配user.login.add 或者user.login

user.* :可以匹配,user.login 

假如此时publisher发送的消息使用的RoutingKey共有四种:

  • china.news代表有中国的新闻消息;

  • china.weather 代表中国的天气消息;

  • england.news 则代表英国新闻;

  • england.weather 代表英国的天气消息;

解释:

  • topic.queue1:绑定的是china.# ,凡是以 china.开头的routing key 都会被匹配到,包括:

    • china.news

    • china.weather

  • topic.queue2:绑定的是#.news ,凡是以 .news结尾的 routing key 都会被匹配。包括:

    • china.news

    • england.news

接下来,就在控制台演示一下,Topic交换机的用法:

创建一个名为hmall.topic的交换机

创建两个队列,一个是topic.queue1另一个是topic.queue2

和交换机绑定关系:

发送消息:

在publisher服务中编写测试类发送消息

代码:

@Slf4j
@SpringBootTest
public class TopicTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Test@DisplayName("测试发送china.news的key")public void testSendChinaNews(){//1,交换机String exchangeName = "hmall.topic";//2,消息内容String msg = "china的新闻和以china开头都能接受到";//3,发送消息rabbitTemplate.convertAndSend(exchangeName,"china.news",msg);}}

结果:

都接收到消息了

 

 在测试一个发送其他的.news看看那个队列能接收到:

代码:

        @Test@DisplayName("测试发送XXX.news那个队列能接收到消息")public void testSendXxxNews(){//1,交换机String exchangeName = "hmall.topic";//2,消息内容String msg = "不管什么只要以.news结尾就能收到";//3,发送消息rabbitTemplate.convertAndSend(exchangeName,"england.news",msg);}

看结果:

topic.queue2又多了一条消息

消息接收:

在consumer服务中,接收消息:

代码:

@Component
public class TopicListener {/*** 接收topic.queue1的消息* @param msg*/@RabbitListener(queues = "topic.queue1")public void receive1(String msg) {System.out.println("接收到topic.queue1的消息: " + msg);}/*** 接收topic.queue2的消息*/@RabbitListener(queues = "topic.queue2")public void receive2(String msg) {System.out.println("接收到topic.queue2的消息: " + msg);}}

结果:

总结:

描述下Direct交换机与Topic交换机的差异?

  • Topic交换机接收的消息RoutingKey必须是多个单词,以 . 分割

  • Topic交换机与队列绑定时的RoutingKey可以指定通配符

  • #:代表0个或多个词

  • *:代表1个词

代码声明队列和交换机

前面我们创建队列和交换机都是使用控制台创建的,但是在实际开发中,这些大多都是我们程序员来定义的。

因此推荐的做法是由程序启动时检查队列和交换机是否存在,如果不存在自动创建。

基本API:

SpringAMQP提供了一个Queue类,用来创建队列:

SpringAMQP还提供了一个Exchange接口,来表示所有不同类型的交换机。

我们可以自己创建队列和交换机,不过SpringAMQP还提供了ExchangeBuilder来简化这个过程:

而在绑定队列和交换机时,需要使用BindBuilder来创建Bindling对象:

fanout示例

在cousumer消费者工程里面,创建一个配置类,声明fanout类型的队列和交换机。

先把我们在控制台创建好的队列和交换机手删除。

队列

交换机

先声明出来了两个队列和一个交换机:

然后进行绑定:

全部代码:

@Configuration
public class FanoutConfig {/*** 声明fanout类型的交换机*/@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("hmall.fanout");}/*** 声明fanout.queue1队列*/@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}/*** 声明fanout.queue2队列*/@Beanpublic Queue fanoutQueue2(){return new Queue("fanout.queue2");}/*** 绑定队列和交换机,绑定队列1* 通过传参的方法*/@Beanpublic Binding fanoutBind1(FanoutExchange fanoutExchange,Queue fanoutQueue1){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}/*** 绑定队列和交换机,绑定队列2* 调用方法绑定*/@Beanpublic Binding fanoutBind2(){return BindingBuilder.bind(fanoutQueue2()).to(fanoutExchange());}
}
启动consumer工程看结果:

可以看到启动的时候报错了,因为我们上面已经写好了,很多监听队列和方法,现在已经删除了。

所以要把这些类,移除ioc容器

解决办法:

把这些都注释掉

再次启动看结果:

交换机有了:

队列也有了

在看看绑定关系:

测试成功了

direct示例

在consumer中创建一个配置类DirectConfig。声明队列和交换机。direct模式由于要绑定多个KEY,会非常麻烦,每一个Key都要编写一个binding:

创建队列和交换机:

创建一个和hmall.direct交换机和direct.queue1队列direct.queue2队列

代码:

    /*** 声明direct类型的hmall.direct的交换机*/@Beanpublic DirectExchange directExchange(){return new DirectExchange("hmall.direct");}/*** 声明direct.queue1队列*/@Beanpublic Queue directQueue1(){return new Queue("direct.queue1");}/*** 声明direct.queue2队列*/@Beanpublic Queue directQueue2(){return new Queue("direct.queue2");}

绑定关系:

代码:

    /*** 绑定队列direct.queue1和交换机,routingKey是red*/@Beanpublic Binding bindingDirectQueue(Queue directQueue1,DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("red");}/*** 绑定队列direct.queue1和交换机,routingKey是blue*/@Beanpublic Binding bindingDirectQueue2(Queue directQueue1,DirectExchange directExchange){return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");}/*** 绑定队列dicect.queue2到交换机,routingKey是red*/@Beanpublic Binding bindDirectQueue2(Queue directQueue2, DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("red");}/*** 绑定队列direct.queue2到交换机,routingKey是yellow*/@Beanpublic Binding bindingDirectQueue3(Queue directQueue2,DirectExchange directExchange){return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");}
运行测试:

队列结果

交换机结果

在控制台就看到了我们刚刚,写的队列和交换机了。

基于注解声明

基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。不过是在消息监听的时候基于注解的方式来声明。

比如我们还是同样的声明队列和交换机,改一下consumer工程的listener包下,对应的方法。

先把创建好的,hmall.direct交换机和direct.queue1队列diect.queue2队列在控制台删除

声明direct类型:

代码:

    /*** 接收direct.queue1队列的消息* @param msg*/@RabbitListener(bindings = @QueueBinding(value = @Queue("direct.queue1"),exchange = @Exchange(value = "hmall.direct",type = ExchangeTypes.DIRECT),key = {"red","blue"}))public void testDirectQueue1(String msg){System.out.println("消费者1接收到的消息:"+msg);}/*** 接收direct.queue2队列的消息* @param msg*/@RabbitListener(bindings = @QueueBinding(value = @Queue("direct.queue2"),exchange = @Exchange(value = "hmall.direct",type = ExchangeTypes.DIRECT),key = {"red","yellow"}))public void testDirectQueue2(String msg){System.out.println("消费者2接收到的消息:"+msg);}

看效果:

队列

交换机

 声明topic类型:

我们先把之前创建好的队列和交换机删除

代码:

    /*** 接收topic.queue1的消息* @param msg*/@RabbitListener(bindings = @QueueBinding(value = @Queue("topic.queue1"),exchange = @Exchange(value = "hmall.topic",type = ExchangeTypes.TOPIC),key = {"china.#"}))public void testTopicQueue1(String msg){System.out.println("接收到topic.queue1的消息: " + msg);}/*** 接收topic.queue2的消息* @param msg*/@RabbitListener(bindings = @QueueBinding(value = @Queue("topic.queue2"),exchange = @Exchange(value = "hmall.topic",type = ExchangeTypes.TOPIC),key = {"#.news"}))public void testTopicQueue2(String msg){System.out.println("接收到topic.queue2的消息: " + msg);}

启动consumer工程

 效果:

看看交换机的绑定关系

消息转换器

Spring的消息发送代码接收的消息体是一个Object:

而在数据传输时,它会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。

只不过,默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:

  • 数据体积过大

  • 有安全漏洞

  • 可读性差

测试默认转换器:
1,创建测试队列:

在consumer工程中,声明一个新的配置类:

MessageConfig类

代码:

@Configuration
public class MessageConfig {/*** 创建一个名为object.queue的队列* @return*/@Beanpublic Queue objectQueue(){return new Queue("object.queue");}
}

效果:

 2,发送Map消息:

在publisher工程中,发送测试消息到object.queue队列

代码:

@Slf4j
@SpringBootTest
public class ObjectTest {@Autowiredprivate RabbitTemplate rabbitTemplate;@Test@DisplayName("测试对象消息")public void testMapToQueue(){//1,队列名称String queueName = "object.queue";//2,发送的消息Map<String, Object> map = new HashMap<>();map.put("name", "sde");map.put("age", 18);//3,发消息rabbitTemplate.convertAndSend(queueName,map);}
}

 看效果

可以看到消息格式非常不友好。

配置JSON转换器:
1,添加依赖:

显然,JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。

publisherconsumer两个服务中都引入依赖

		<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency>

consumer工程

 

publisher工程 

2,配置消息转换器:

配置消息转换器,在publisherconsumer两个服务的启动类中添加一个Bean即可。

publisher启动类中配置:

代码:

    /*** 配置消息转换器* @return*/@Beanpublic MessageConverter messageConverter(){//1,定义消息转换器Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();//2,配置每条消息自动创建id;用于识别不同的消息也可以在页面中基于id判断是否是重复消息jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;}

 consumer启动类中配置:

代码和上面一样

3,测试

在rabbitMQ的控制台中删除 object.queue 队列中的消息;重新启动 consumer

② 执行testMap 发送消息

③ 在rabbitMQ的控制台中;查看消息

清空队列消息

重新发送一条消息

看一条消息:

消费者接收Object

代码:

@Component
public class ObjectListener {@RabbitListener(queues = "object.queue")public void listenObjectQueue(Map<String, Object> map) {System.out.println("【消费者】接收到 object.queue 的消息: " + map );}
}

 效果:

这就RabbitMQ的基础部分已经结束~~~谢谢观看

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/528979.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

京东商品详情接口数据采集—价格,库存,支持高并发

初识API调用 为帮助商家及开发者快速掌握京东API调用方法&#xff0c;本文为大家提供的万邦API工具为例&#xff0c;为读者演示一例API调用过程&#xff0c;并做相应讲解。 item_get-获得JD商品详情 1、API公共参数示例 请求地址: https://api-gw.onebound.cn/jd/item_get …

Spring循环依赖的成因与破局

一、Spring注入类型 Spring 核心功能之一依赖注入&#xff0c;依赖注入是使用 Spring 框架的基本手段&#xff0c;通过他获取各种类型的 bean&#xff0c;但使用不同的依赖注入类型时经常会遇到循环依赖的问题。Spring 依赖注入类型&#xff1a; 字段注入&#xff0c;这是最常…

Word中解决插入脚注导致的分页位置错误问题

先放一个截图&#xff1a; 上面的截图中&#xff0c;样式为标题3的段落“四、固执的念头”前插入了连续型分节符&#xff0c;并且该分节符的样式为正文&#xff0c;前后的正文段落中有脚注&#xff0c;结果在分页时&#xff0c;标题3段落“四、固执的念头”后的正文段落自动进入…

什么台灯对眼睛好?揭秘四款央视推荐的护眼台灯

近年来&#xff0c;随着电子产品的普及&#xff0c;虽说给生活带来了许多便利&#xff0c;不过对于眼睛还没发育完全的孩子而言&#xff0c;经常使用电子产品是非常容易伤眼的&#xff0c;更何况这些孩子每天还需要长时间的用眼学习&#xff0c;眼睛的负担是非常大的。所以在学…

谷粒商城【成神路】-【10】——缓存

目录 &#x1f9c2;1.引入缓存的优势 &#x1f953;2.哪些数据适合放入缓存 &#x1f32d;3.使用redis作为缓存组件 &#x1f37f;4.redis存在的问题 &#x1f9c8;5.添加本地锁 &#x1f95e;6.添加分布式锁 &#x1f95a;7.整合redisson作为分布式锁 &#x1f697…

java学习之路-数据类型与变量

目录 数据类型与变量 1. 字面常量 2. 数据类型 3. 变量 3.1 变量概念 3.2 整型变量 3.2.1 整型变量 3.2.2 长整型变量 3.2.3 短整型变量 3.2.4 字节型变量 3.3 浮点型变量 3.3.1 双精度浮点型 3.3.2 单精度浮点型 3.4 字符型变量 3.5布尔型变量 3.6 类型转换 …

MySQL--索引优化实战篇(2)

前言&#xff1a; 我们常说的SQL优化&#xff0c;简单来说就是索引优化&#xff0c;通过合理创建索引&#xff0c;调整SQL语法等&#xff0c;来提升查询效率&#xff0c;想要进行SQL优化&#xff0c;就必须知道索引的原理&#xff0c;而且能够看懂SQL的执行计划。 MySQL–索引…

数据保护设备的主要功能是什么

数据保护设备在当今数字化时代扮演着至关重要的角色。随着信息技术的迅猛发展&#xff0c;数据的产生、传输和存储量呈现出爆炸式增长&#xff0c;数据的安全性和完整性成为了企业和个人关注的重点。数据保护设备作为保障数据安全的重要手段&#xff0c;正逐渐受到广泛关注和应…

Python图像处理【22】基于卷积神经网络的图像去雾

基于卷积神经网络的图像去雾 0. 前言1. 渐进特征融合网络2. 图像去雾2.1 网络构建2.2 模型测试 小结系列链接 0. 前言 单图像去雾 (dehazing) 是一个具有挑战性的图像恢复问题。为了解决这个问题&#xff0c;大多数算法都采用经典的大气散射模型&#xff0c;该模型是一种基于单…

基于多源信息融合的巡飞弹对地目标识别与毁伤评估

源自&#xff1a;系统仿真学报 作者&#xff1a;徐艺博 于清华 王炎娟 郭策 冯世如 卢惠民 “人工智能技术与咨询” 发布 摘 要 面向利用多枚巡飞弹对地面高防御移动目标进行打击的任务场景&#xff0c;提出一种基于多源信息融合的巡飞弹对地移动目标识别与毁伤评估方法…

Vue2利用创建a标签实现下载本地静态文件到本地电脑上的功能

最近PC项目遇到一个需求&#xff0c;那就是需要前端下载前端代码包里的前端文件到本地&#xff0c;并且可以给下载下来的文件名指定任意的文件名&#xff0c;如下图所示&#xff0c;在前端代码里public里的statics里有个静态文件zswj.pem&#xff0c;页面上有个下载按钮&#x…

基于.Net 的图形验证码模块

&#x1f3c6;作者&#xff1a;科技、互联网行业优质创作者 &#x1f3c6;专注领域&#xff1a;.Net技术、软件架构、人工智能、数字化转型、DeveloperSharp、微服务、工业互联网、智能制造 &#x1f3c6;欢迎关注我&#xff08;Net数字智慧化基地&#xff09;&#xff0c;里面…