详解SpringCloud微服务技术栈:一文速通RabbitMQ,入门到实践

👨‍🎓作者简介:一位大四、研0学生,正在努力准备大四暑假的实习
🌌上期文章:详解SpringCloud微服务技术栈:DockerCompose部署微服务集群
📚订阅专栏:微服务技术全家桶
希望文章对你们有所帮助

RabbitMQ的使用还是很广泛的,主要是用在异步通讯的过程中的消息中间件,而在之前我学习Redis的时候,已经分别通过阻塞队列和Redis的某种数据结构实现了异步通信,可以看我的这两篇总结文章:
Redis:原理速成+项目实战——Redis实战9(秒杀优化)
Redis:原理速成+项目实战——Redis实战10(Redis消息队列实现异步秒杀)

同步通讯与异步通讯的原理、优缺点就不在这里讲解了,之前提到过,做异步通讯的最主流的还得是RabbitMQ,所以速成一波。

RabbitMQ入门到实践

  • MQ常见技术介绍
  • RabbitMQ快速入门
    • 介绍和安装(基于Centos7)
    • 消息模型介绍
    • 简单队列模型
  • SpringAMQP
    • 基本介绍
    • 入门案例
      • 消息发送
      • 消息接收
    • Work Queue 工作队列模型
    • 发布订阅模型
      • FanoutExchange
      • DirectExchange
      • TopicExchange
    • 消息转换器

MQ常见技术介绍

MQ(MessageQueue),即存放消息的队列,也就是事件驱动架构中的Broker。

RabbitMQActiveMQRocketMQKafka
可用性一般
单机吞吐量一般非常高
消息延迟微秒级毫秒级毫秒级毫秒以内
消息可靠性一般一般

RabbitMQ看起来最劣势的地方是单机吞吐量,但是其吞吐量也足够满足大多数企业的需求了。国内用的比较多的是Kafka和RabbitMQ,前者适合拥有海量数据,且对信息安全没有那么高要求的情景。

RabbitMQ快速入门

介绍和安装(基于Centos7)

RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网地址:
RabbitMQ官网

安装步骤:

1、下载镜像

方式一:在线拉取

docker pull rabbitmq:3-management

方式二:本地下载镜像包,上传到虚拟机后,使用命令进行加载:
镜像包安装:

链接:https://pan.baidu.com/s/1L-Kzd8PWMYaBwGQPwI1z9g?pwd=mjt5
提取码:mjt5

加载命令:

docker load -i mq.tar

2、安装MQ
执行下面命令来运行MQ容器:

docker run \-e RABBITMQ_DEFAULT_USER=itcast \-e RABBITMQ_DEFAULT_PASS=123321 \--name mq \--hostname mq1 \-p 15672:15672 \-p 5672:5672 \-d \rabbitmq:3-management

其中,15672是管理平台的端口,5672是做消息通信的端口。
现在就可以直接访问RabbitMQ后台管理界面并登录:
在这里插入图片描述
在这里插入图片描述
RabbitMQ的结构:
在这里插入图片描述
发送者将消息发送到交换机exchange,然后再发到队列,消费者从队列中获取消息并处理。每个RabbitMQ的用户都有一个自己的VirtualHost,且互相隔离。

RabbitMQ的几个概念:

channel:操作MQ的工具
exchange:路由消息到队列中
queue:缓存消息
virtual host:虚拟主机,是对queue、exchange等资源的逻辑分组

消息模型介绍

MQ的官方文档给了5种MQ的Demo,对应了几种不同的用法:

1、基本消息队列(BasicQueue)
2、工作消息队列(WorkQueue)
3、发布订阅(Publish、Subscribe),又根据交换机类型不同分为三种:
(1)Fanout Exchange:广播
(2)Direct Exchange:路由
(3)Topic Exchange:主题

官方的HelloWorld是基于最基础的消息队列模型来实现的,只包括三个角色:

publisher:消息发布者,将消息发送到队列queue
queue:消息队列,负责接受并缓存消息
consumer:订阅队列,处理队列中的消息

而并没有交换机。

简单队列模型

虽是简单队列模型,但要用的API太多了,写起来复杂,直接导入下面的工程跑一下publisher测试类再跑一下consumer测试类,自行调试并在RabbitMQ的后台界面查看相关信息(admin、connection、channel等)。

链接:https://pan.baidu.com/s/1crv6sUmKM44Crj8u–J4GA?pwd=smjn
提取码:smjn

基本消息队列的消息发送流程:

1、建立connection
2、创建channel
3、利用channel声明队列
4、利用channel向队列发送消息

基本消息队列的消息接收流程:

1、建立connection
2、创建channel
3、利用channel声明低劣
4、定义consumer的消费行为handleDilivery()
5、利用channel将消费者与队列绑定

SpringAMQP

SpringAMQP可以大大的简化消息发送与接收的代码。在这里进行SpringAMQP的介绍,并且利用它来实现RabbitMQ中的五种消息队列模型。

基本介绍

AMQP即为Advanced Message Queuing Protocol(先进消息队列协议),是用于在应用程序之间传递业务消息的开放标准。该协议与语言和平台无关,更符合微服务中独立性的要求。
SpringAMQP即为AMQP的一种实现,基于AMQP协议的定义的一套API规范,提供了模板来发送和接收消息。包含2部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。

入门案例

利用SpringAMQP实现Rabbit中的入门案例——HelloWorld中的基础消息队列功能。
先操作下面的流程:

1、在父工程中引入spring-amqp的依赖(发送和接收都要用到)

	<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

2、在publisher服务中利用RabbitTemplate发送消息到simple.queue这个队列
3、在consumer服务中编写消费逻辑,绑定simple.queue这个队列

消息发送

1、在publisher服务中编写application.yml,添加mq连接信息:

spring:rabbitmq:host: 192.168.177.130 # RabbitMQ的ip地址port: 5672 # RabbitMQ的端口username: itcastpassword: 123321virtual-host: / 

2、在publisher服务中新建一个测试类,编写测试方法,利用RabbitTemplate发送消息到simple.queue这个队列(从之前的测试中创建出来的,没有的话去自行运行创建出来即可):

@RunWith(SpringRunner.class)
@SpringBootTest
public class SpringAmqpTest {@Resourceprivate RabbitTemplate rabbitTemplate;@Testpublic void testSendMessage2SimpleQueue(){String queueName = "simple.queue";String message = "hello, spring amqp";rabbitTemplate.convertAndSend(queueName, message);}
}

在这里插入图片描述
在这里插入图片描述

消息接收

在consumer中编写消费逻辑,监听simple.queue:
1、在consumer服务中国编写application.yml,添加mq连接信息:

spring:rabbitmq:host: 192.168.177.130 # RabbitMQ的ip地址port: 5672 # RabbitMQ的端口username: itcastpassword: 123321virtual-host: / 

2、在consumer服务中新建一个类,编写消费逻辑:

@Component // 注册成一个bean
public class SpringRabbitListener {@RabbitListener(queues = "simple.queue") //监听的队列名public void listenSimpleQueue(String msg){System.out.println("消费者接受到simple.queue的消息:【" + msg + "】");}
}

这个消费行为应该要自动的让其进行,所以将其交给Spring托管,只需要执行Spring的启动类函数Application即可自动实现监听。
在这里插入图片描述
在这里插入图片描述

Work Queue 工作队列模型

在这里插入图片描述
如果消息很多,显然consumer1与consumer2要一起执行,这是一种合作关系。
如果publisher发送消息的频率很高,而一个consumer无法处理完,这时候就需要其它consumer帮助,即为WorkQueue(工作队列)模型。
接下来要模拟WorkQueue,实现一个队列绑定多个消费者。

实现的基本思路如下:
1、在publisher服务中定义测试方法,每秒产生50条消息,发送到simple.queue

	@Testpublic void testSendMessage2WorkQueue() throws InterruptedException {String queueName = "simple.queue";String message = "hello, message__";for (int i = 0; i <= 50; i++) {rabbitTemplate.convertAndSend(queueName, message + i);Thread.sleep(20);}}

2、在consumer服务中定义两个消息监听者,都监听simple.queue队列

	@RabbitListener(queues = "simple.queue") //监听的队列名public void listenWorkQueue(String msg) throws InterruptedException {System.out.println("消费者1接受到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(20); //每秒消费50条}@RabbitListener(queues = "simple.queue") //监听的队列名public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println("消费者2接受到消息:【" + msg + "】" + LocalTime.now());Thread.sleep(200); //每秒消费5条}

这样理论上消费者是可以自己处理完消息的。然而启动进行测试可以发现,消费者1和消费者2都执行了25条,且消费者1执行完了,消费者2还在那低效率执行,直到结束,导致1s发送的50条消息用了5s才执行完毕:
在这里插入图片描述
造成这个问题的原因是因为RabbitMQ的消息预取机制,它给两个消费者都平均分配了消息,但是consumer2相对没那样的能力处理那么多消息,所以应该要让consumer2少拿一点任务。

解决方式是在application.yml中设置preFetch,可以控制预取消息的上限:
在这里插入图片描述

发布订阅模型

之前的模型,只要消息被其中之一的消费者消费了,这个消息就会消失。而发布订阅模型与之前案例的区别就是允许将同一消息发送给多个消费者,实现的方式是加入了交换机exchange。结构如下:
在这里插入图片描述
当消息被交换机安排到了多个队列去,自然就可以实现该消息被多个消费者给消费。
常见exchange类型包括:
(1)Fanout:广播
(2)Direct:路由
(3)Topic:话题

FanoutExchange

FanoutExchange会将接收到的消息路由到每一个跟其绑定的queue,绑定可以由SpringAMQP提供的API去声明队列和交换机并且实现绑定。
在这里插入图片描述
演示流程:
1、在consumer服务中,利用代码声明队列、交换机,并将两者绑定

@Configuration
public class FanoutConfig {//声明交换机@Beanpublic FanoutExchange fanoutExchange(){return new FanoutExchange("itcast.fanout");}//声明队列@Beanpublic Queue fanoutQueue1(){return new Queue("fanout.queue1");}//将队列1绑定到交换机@Beanpublic Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);}@Beanpublic Queue fanoutQueue2() {return new Queue("fanout.queue2");}@Beanpublic Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);}
}

执行主程序,Spring将会自动读取配置并执行:
在这里插入图片描述
在这里插入图片描述

2、在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2:

	@RabbitListener(queues = "fanout.queue1") //监听的队列名public void listenFanoutQueue1(String msg){System.out.println("消费者接受到fanout.queue1的消息:【" + msg + "】");}@RabbitListener(queues = "fanout.queue2") //监听的队列名public void listenFanoutQueue2(String msg){System.out.println("消费者接受到fanout.queue2的消息:【" + msg + "】");}

编写完毕后重启ConsumerApplication。

3、在publisher中编写测试方法,向itcast.fanout发送消息

	@Testpublic void testSendFanoutExchange(){//交换机名称String exchangeName = "itcast.fanout";//消息String message = "hello, every one!";//发送消息rabbitTemplate.convertAndSend(exchangeName, "", message);}

在这里插入图片描述

DirectExchange

DirectExchange会将接收到的消息根据规则路由到指定的Queue,因此称为路由模式(routes):

1、每一个Queue都和Exchange设置一个BindingKey
2、发布者发送消息时,指定消息的RoutingKey
3、Exchange将消息路由到BindingKey与消息RouingKey一致的队列

需要注意的是,队列之间是可以绑定相同的BindingKey的,一个队列可以绑定多个BindingKey,所以如果publisher发送消息会被多个queue读取,也就是广播,因此DirectExchange是可以模拟FanoutExchange的。

实现流程如下:
1、利用@RabbitListener声明Exchange、Queue、RoutingKey(不再用bean声明,要声明一堆的东西,太复杂了)
2、在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2
上面2步一起在consumer的监听类中完成:

	@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),key = {"red", "blue"}))public void listenDirectQueue1(String msg){System.out.println("消费者1接受到direct.queue1的消息" + msg + "】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),key = {"red", "yellow"}))public void listenDirectQueue2(String msg){System.out.println("消费者1接受到direct.queue2的消息" + msg + "】");}

在这里插入图片描述

3、在publisher中编写测试方法,向itcast. direct发送消息

	@Testpublic void testSendDirectExchange(){//交换机名称String exchangeName = "itcast.direct";//消息String message = "hello, blue!";//发送消息rabbitTemplate.convertAndSend(exchangeName, "blue", message);}

在这里插入图片描述

TopicExchange

TopicExchange与DirectExchange类似,区别在于其routingKey必须是多个单词的列表,分别以.分割。

Queue与Exchange指定BindingKey时可以使用通配符:

#:0个或多个单词
*:一个单词

实现流程:
1、利用@RabbitListener声明Exchange、Queue、RoutingKey
2、在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2
上面两个步骤的代码:

	@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue1"),exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),key = "china.#"))public void listenTopicQueue1(String msg){System.out.println("消费者1接受到topic.queue1的消息" + msg + "】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "topic.queue2"),exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),key = "#.news"))public void listenTopicQueue2(String msg){System.out.println("消费者1接受到topic.queue2的消息" + msg + "】");}

3、在publisher中编写测试方法,向itcast.topic发送消息

	@Testpublic void testSendTopicExchange(){//交换机名称String exchangeName = "itcast.topic";//消息String message = "南京航空航天大学第五轮学科评估有7个A类评分!";//发送消息rabbitTemplate.convertAndSend(exchangeName, "china.news", message);}

在这里插入图片描述

消息转换器

Ctrl+P查看参数,可以发现发出的消息的类型都是Object类型的。
测试发送Object类型消息。
说明SpringAMQP允许我们发任何对象,比如List,Map,但是RabbitMQ是以字节传输的,这说明了SpringAMQP会帮我们序列化为字节后发送。
Spring的对消息对象的处理是基于JDK的ObjectOutputStream完成序列化,这种序列化的性能差,而且安全有问题,数据长度也会很长。所以最好换一个序列化方式。
推荐用JSON方式序列化,步骤如下:
1、在publisher服务引入依赖:

	<dependency><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></dependency>

2、在publisher服务声明MessageConverter:

	@Beanpublic MessageConverter messageConverter(){return (MessageConverter) new Jackson2JsonMessageConverter();}

消息的接收也需要和上面一样:引入相同的依赖,然后在consumer服务中定义MessageConverter。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/422273.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Joern环境的安装(Windows版)

Joern环境的安装(Windows版) 网上很少有关于Windows下安装Joern的教程&#xff0c;而我最初使用也是装在Ubuntu虚拟机中&#xff0c;这样使用很占内存&#xff0c;影响体验感。在Windows下使用源码安装Joern也是非常简单的过程&#xff1a; 提前需要的本地环境&#xff1a; …

优秘智能:颠覆传统广告模式,引领数字人口播潮流,助力企业高效推广!

随着人工智能技术的飞速发展&#xff0c;AI已经渗透到我们生活的方方面面。在这个数字化时代&#xff0c;优秘智能科技有限公司凭借其强大的研发实力和创新能力&#xff0c;成功研发出一款AI数字人口播短视频制作&#xff08;制作&#xff1b;ai6ai69&#xff09;平台&#xff…

前端开发 0: 前端环境搭建

欢迎来到我即将展开的一系列博客&#xff0c;将带你踏上前端开发的奇妙之旅&#xff01;在这个数字化时代&#xff0c;前端开发成为了构建现代网页和应用程序的关键技能。无论你是刚刚入门还是已经有一些经验&#xff0c;这个系列将为你提供宝贵的知识和实用的技巧&#xff0c;…

MapReduce概述

文章目录 1. 分布式系统的驱动力和挑战2. 分布式系统的抽象和实现工具3. 可扩展性、可用性、一致性4. MapReduce基本工作方式5. Map函数和Reduce函数 1. 分布式系统的驱动力和挑战 分布式系统的核心是通过网络来协调&#xff0c;共同完成一致任务的一些计算机。构建分布式系统…

实现VLAN之间的路由

原理&#xff1a;路由器子接口 一个接口允许多个VLAN通过&#xff08;避免占用物理路由器接口&#xff09;。 目标 第 1 部分&#xff1a;单臂路由 第 2 部分&#xff1a;配置第三层交换机的路由端口 第 3 部分&#xff1a;带SVI的VLAN间路由 第 4 部分&#xff1a;补充知…

SpikingJelly笔记之IFLIF神经元

文章目录 前言一、脉冲神经元二、IF神经元1、神经元模型2、神经元仿真 三、LIF神经元1、神经元模型2、神经元仿真 总结 前言 记录整合发放(integrate-and-fire, IF)神经元与漏电整合发放(leaky integrate-and-fire, LIF)神经元模型&#xff0c;以及在SpikingJelly中的实现方法…

1.redhat网卡配置

想要通过cmd ping通redhat 1.在redhat输入:ifconfig 将自己主机网络适配器VMware Network Adapter VMnet1的IPv4配置在同一网段,掩码是255.255.255.0,所以最后一位不同就可以 推荐用FileZilla远程上传文件

Flash读取数据库中的数据

Flash读取数据库中的数据 要读取数据库的记录&#xff0c;首先需要建立一个数据库&#xff0c;并输入一些数据。数据库建立完毕后&#xff0c;由Flash向ASP提交请求&#xff0c;ASP根据请求对数据库进行操作后将结果返回给Flash&#xff0c;Flash以某种方式把结果显示出来。 …

网页无法访问但是有网什么原因

目录 1.运行网络诊断&#xff0c;确认原因 原因A.远程计算机或设备将不接受连接(该设备或资源(Web 代理)未设置为接受端口“7890”上的连接 原因B.DNS服务器未响应 场景A.其他的浏览器可以打开网页&#xff0c;自带的Edge却不行 方法A&#xff1a;关闭代理 Google自带翻译…

gitlab设置/修改克隆clone地址端口

直接看代码吧&#xff0c;最近写的太多了 修改前 修改后 vi /opt/gitlab/embedded/service/gitlab-rails/config/gitlab.yml 将port修改为自己在安装gitlab时映射的能够拉取到项目的端口即可 按esc后:wq后gitlab-ctl restart即可

20240122在WIN10下给GTX1080配置CUDA驱动

20240122在WIN10下给GTX1080配置CUDA驱动 2024/1/22 19:09 缘起&#xff1a;为了使用openai的whisper识别小语种【非英语】电影的字幕&#xff0c;决定开始折腾CUDA了&#xff01; https://github.com/openai/whisper https://www.bilibili.com/video/BV1d34y1F7qA https://ww…

Redis相关面试题大全

&#x1f4d5;作者简介&#xff1a; 过去日记&#xff0c;致力于Java、GoLang,Rust等多种编程语言&#xff0c;热爱技术&#xff0c;喜欢游戏的博主。 &#x1f4d7;本文收录于java面试题系列&#xff0c;大家有兴趣的可以看一看 &#x1f4d8;相关专栏Rust初阶教程、go语言基…