分布式消息队列:RabbitMQ(1)

目录

一:中间件

二:分布式消息队列 

2.1:是消息队列

2.1.1:消息队列的优势

2.1.1.1:异步处理化

2.1.1.2:削峰填谷

2.2:分布式消息队列

2.2.1:分布式消息队列的优势

2.2.1.1:数据的持久化

2.2.1.2:可扩展性

2.2.1.3:应用解耦

2.2.1.4:发送订阅 

2.2.2:分布式消息队列的应用场景 

三:Rabbitmq

3.1:基本概念

3.2:快速入门 

3.2.1:引入消息队列Java客户端

3.2.2:单消费开发生产者和消费者

 3.2.3:多消费开发生产者和消费者

3.3.3:交换机

3.3.3.1:交换机的类别

a):fanout


一:中间件

连接多个系统,帮助多个系统紧密协作的技术(组件)

二:分布式消息队列 

2.1:是消息队列

概念:存储消息的队列

关键词:存储,消息,队列

存储:存储数据

消息:某种数据结构,比如l字符串,对象,二进制数据,json等

队列:先进先出的数据结构

作用:在不同的系统下,应用之间实现消息的传输,不需要考虑传输应用的编程语言,系统和,框架等等,实现应用解耦的作用。

        eg:可以让Java开发的应用发消息,让php开发的应用收消息。

 针对生产者来说:不需要关心消费者什么时候接受消息,什么时候消费,我只需要把我的工作完成就好了。生产者和消费者之间实现了解耦。

针对上图,同样我们会发现,当小李要别的书籍的时候,小王也可以将别的书籍放到消息队列中。生产者和消费者从某一种程度上实现了解耦合。

2.1.1:消息队列的优势

2.1.1.1:异步处理化

生产者发送消息之后,可以继续去忙别的,消费者什么时候消费都可以,不产生阻塞。

2.1.1.2:削峰填谷

先把用户的请求放到消息队列种,消费者(实际执行操作的应用)可以按照自己的需求,慢慢去取。

举个栗子:

原本:

12点时来了10万个请求,原本情况下,10万个请求都在系统内部立刻处理,很快系统压力过大宕机。

现在:

把10万个请求放到消息队列中,处理系统以自己的恒定速率(比如每秒1个)慢慢执行,稳定处理。

2.2:分布式消息队列

2.2.1:分布式消息队列的优势

分布式消息队列继承于消息队列的优势,并进行了一部分的拓展。

2.2.1.1:数据的持久化

把消息集中存储在硬盘当中,服务器重启就不会丢失。

2.2.1.2:可扩展性

可以根据需求,随时增加(或减少)节点,继续保持稳定的服务。

2.2.1.3:应用解耦

可以连接不同语言(Java,PHP),框架开发的系统,让这些系统读取数据。

示例:

以前的项目:

加了分布式消息队列之后的项目: 

1:一个系统挂了,不影响另一个系统。

2:系统挂了之后并恢复,仍然可以从消息队列中取消息

3:只要发送消息到队列,就可以立即进行返回,不用同步调用所有系统,性能更高

2.2.1.4:发送订阅 

假设情景:当QQ进行了一部分改革之后,其他使用QQ的APP也应该处理
这部分改革。
QQ做了一个情景,要让其他系统知道,比如公告消息。如果QQ一次性给这些应用发消息,所引出的问题如下:
1.每次发通知都要调用很多系统,很麻烦,很可能失败
2.不知道哪个系统需要这些QQ的改革。
解决方案:大的核心系统始终往消息队列发消息,其他的系统都去订阅这个消息队列的消息,用的时候进行取就OK。

2.2.2:分布式消息队列的应用场景 

1:耗时场景。

2:高并发场景。

3:分布式系统的协作。(跨团队,跨业务合作,应用解耦)

4:强稳定的场景(金融业务,持久化,可靠性,削锋填谷)  

三:Rabbitmq

特点:生态好,易学习,易于理解,时效性强,支持不同语言的客户端,扩展性,可用性都很不错。

3.1:基本概念

AMPQ协议:Rabbitmq是遵循AMPQ协议的一种消息中间件。

生产者:发消息到交换机

消费者:收消息的,从某个队列中取消息

交换机(exchange):负责把消息转发到对应的队列

队列(Queue):存储消息的

路由(Rountes):转发,怎么把一个消息从一个地方转发到另一个地方(比如生产者转发到某个队列)

Rabbitmq:端口占用   5672:程序连接的端口 15672:管理界面端口

Rabbitmq的安装:https://blog.csdn.net/qq_25919879/article/details/113055350

管理器页面打不开:http://t.csdnimg.cn/6FqZl

3.2:快速入门 

3.2.1:引入消息队列Java客户端

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.17.0</version></dependency>

3.2.2:单消费开发生产者和消费者

生产者端代码:

public class SingeProducer {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {//创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();//频道相当于客户端(jdbcClient,redisClient),提供了和消队列server建立通信,程序通过channel进行发送消息Channel channel = connection.createChannel()) {//创建消息队列,第二个参数(durable):是否开启持久化,第三个参数exclusiove:是否允许当前这个创建消息队列的//连接操作消息队列 第四个参数:没有人使用队列,是否需要删除channel.queueDeclare(QUEUE_NAME, false, false, false, null);//发送消息String message = "Hello World!";channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));System.out.println(" [x] Sent '" + message + "'");}}
}

消费者代码:

public class SingeConsumer {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {//创建连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");//创建频道,提供通信Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");//如何处理消息DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [x] Received '" + message + "'");};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });}
}

 3.2.3:多消费开发生产者和消费者

场景:一个生产者给队列里面发了一条消息,多个消费者进行消费。适用于多个机器同时去接收并处理任务(每个机器处理任务有限)

队列持久化:

durable:

参数设置为true,服务器队列不丢失

 channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);

 消息持久化:

 指定MessageProperties.PERSISTENY_TEXT_PLAIN参数

    channel.basicPublish("", TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));

生产者端代码:

public class MultiProducer {private static final String TASK_QUEUE_NAME = "multi_queue";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);Scanner scanner=new Scanner(System.in);while(scanner.hasNext()){String message = scanner.nextLine();channel.basicPublish("", TASK_QUEUE_NAME,MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + "'");}}}
}

消费者代码: 

在消费者代码中,如何测验一个消费者只能取一个任务,我们利用for循环来进行解决。

指定确认某条消息:

第一个参数:获取消息的信息

第二个参数:如果是true,把所有的历史消息全都确认了。如果为false,取出当前的消息。

   //第二个参数:是否一次性取所有的消息。如果为true,则要取所有的挤压在消息队列中的消息//如果为false,则为一次性取一个消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

指定拒绝某条消息

第一个参数:获取消息的信息

第二个参数:如果是true,则代表是否要拒绝所有的历史消息。

第三个参数:如果是false, 则代表失败的任务是否要重新入队。

  channel.basicNack(delivery.getEnvelope().getDeliveryTag(),false,false);
public class MultiConsumer {private static final String TASK_QUEUE_NAME = "multi_queue";public static void main(String[] argv) throws Exception {//建立连接ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");final Connection connection = factory.newConnection();for (int i = 0; i <= 2; i++) {final Channel channel = connection.createChannel();int finalI=i;//声明队列channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);System.out.println(" [*] Waiting for messages. To exit press CTRL+C");//控制单个消费者的任务积压数:每个消费者最多处理一个任务,每个消费者智能处理一个任务channel.basicQos(1);//处理从队列中取的的消息DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");try {//处理工作System.out.println(" [x] Received '" +"编号:"+finalI+ message + "'");//停20秒模拟一个机器处理工作能力有限Thread.sleep(20000);//第二个参数:是否一次性取所有的消息。如果为true,则要取所有的挤压在消息队列中的消息//如果为false,则为一次性取一个消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);} catch (InterruptedException e) {throw new RuntimeException(e);} finally {System.out.println(" [x] Done");channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);}};//开启消费监听channel.basicConsume(TASK_QUEUE_NAME, false, deliverCallback, consumerTag -> { });}}}

3.3.3:交换机

一个生产者给多个队列发消息,一个生产者对多个队列。交换机:转发功能,怎么把消息转发到不同的队列上。

3.3.3.1:交换机的类别
a):fanout

场景:很适用于发布订阅的场景。

特点:消息会被转发到所有绑定到交换机的队列。

生产者代码:当生产者发送消息后,由交换机放到消息队列中,消费者从消息队列中取。

public class FonoutProducer {private static final String EXCHANGE_NAME = "1";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {//创建交换机channel.exchangeDeclare(EXCHANGE_NAME, "fanout");Scanner scanner=new Scanner(System.in);while(scanner.hasNext()){String message = scanner.nextLine();channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes("UTF-8"));System.out.println(" [x] Sent '" + message + "'");}}}}

消费者代码:

public class FonoutConsumer {private static final String EXCHANGE_NAME = "1";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();Channel channel2= connection.createChannel();//声明交换机//创建队列,随机分配一个队列名称channel.exchangeDeclare(EXCHANGE_NAME, "fanout");String queueName="xiaowang";channel.queueDeclare(queueName,true,false,false,null);channel.queueBind(queueName, EXCHANGE_NAME, "");channel2.exchangeDeclare(EXCHANGE_NAME, "fanout");String queueName2="xiaoli";channel2.queueDeclare(queueName2,true,false,false,null);channel2.queueBind(queueName2,EXCHANGE_NAME,"");System.out.println(" [*] Waiting for messages. To exit press CTRL+C");DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [小王] Received '" + message + "'");};DeliverCallback deliverCallback2 = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println(" [小李] Received '" + message + "'");};channel.basicConsume(queueName, true, deliverCallback1, consumerTag -> { });channel.basicConsume(queueName2, true, deliverCallback2, consumerTag -> { });}}

运行结果:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/151754.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

手写RPC框架

文章目录 什么是RPC框架RPC框架中的关键点通信协议序列化协议动态代理和反射 目前已有的RPC框架手写RPC框架介绍项目框架项目执行流程项目启动 什么是RPC框架 RPC&#xff08;Remote Procedure Call&#xff0c;远程过程调用&#xff09;, 简单来说遵循RPC协议的就是RPC框架. …

人工智能基础_机器学习003_有监督机器学习_sklearn中线性方程和正规方程的计算_使用sklearn解算八元一次方程---人工智能工作笔记0042

然后我们再来看看,如何使用sklearn,来进行正规方程的运算,当然这里 首先要安装sklearn,这里如何安装sklearn就不说了,自己查一下 首先我们还是来计算前面的八元一次方程的解,但是这次我们不用np.linalg.solve这个 解线性方程的方式,也不用 直接 解正规方程的方式: 也就是上面…

Mysql,SqlServer,Oracle获取库名 表名 列名

先看下需求背景&#xff1a; 获取某个数据源连接下所有库名&#xff0c;库下所有表名&#xff0c;表中所有字段 1.MySql 先说MySql吧&#xff0c;最简单 1.1获得所有数据库库名 这是一个mysql和sqlserver公用的方法&#xff0c;这里url不用担心数据库问题&#xff0c;他其实…

Android13源码添加系统服务

本文基于Android 13的framework层添加系统接口&#xff0c;为应用层提供读写函数、以及执行命令! 添加接口 frameworks/base/core/java/android/app/IDevices.aidl package android.app; interface IDevices {//读取文件String readFile(String path);//写入文件void writeF…

设计模式之中介模式

文章目录 一、介绍二、生活中的中介模式三、中介模式中的角色四、案例演示1. 角色分析 五、优缺点 一、介绍 中介模式(Mediator Pattern)&#xff0c;属于行为型设计模式。目的是把系统中对象之间的调用关系从一对多转变成一对一的调用关系&#xff0c;以此来降低多个对象和类…

2023全新TwoNav开源网址导航系统源码 | 去授权版

2023全新TwoNav开源网址导航系统源码 已过授权 所有功能可用 测试环境&#xff1a;NginxPHP7.4MySQL5.6 一款开源的书签导航管理程序&#xff0c;界面简洁&#xff0c;安装简单&#xff0c;使用方便&#xff0c;基础功能免费。 TwoNav可帮助你将浏览器书签集中式管理&#x…

LeetCode:1465. 切割后面积最大的蛋糕(C++)

目录 1465. 切割后面积最大的蛋糕 题目描述&#xff1a; 实现代码与解析&#xff1a; 贪心 原理思路&#xff1a; 1465. 切割后面积最大的蛋糕 题目描述&#xff1a; 矩形蛋糕的高度为 h 且宽度为 w&#xff0c;给你两个整数数组 horizontalCuts 和 verticalCuts&#xff…

基于机器视觉的行人口罩佩戴检测 计算机竞赛

简介 2020新冠爆发以来&#xff0c;疫情牵动着全国人民的心&#xff0c;一线医护工作者在最前线抗击疫情的同时&#xff0c;我们也可以看到很多科技行业和人工智能领域的从业者&#xff0c;也在贡献着他们的力量。近些天来&#xff0c;旷视、商汤、海康、百度都多家科技公司研…

安装使用vcpkg的简易教程

目录 1. 首先安装vcpkg2. 在vcpkg目录下运行bootstrap-vcpkg.bat 命令3. 接着vs进行集成4. 使用vcpkg搜索可用的包5.下载安装所需包6.下载安装完成 1. 首先安装vcpkg 使用git命令下载 git clone https://github.com/Microsoft/vcpkg.git如果下载失败可直接下载文件 (vcpkg-ma…

计算机视觉 激光雷达结合无监督学习进行物体检测的工作原理

一、简述 激光雷达是目前正在改变世界的传感器。它集成在自动驾驶汽车、自主无人机、机器人、卫星、火箭等中。该传感器使用激光束了解世界,并测量激光击中目标返回所需的时间,输出是点云信息,利用这些信息,我们可以从3D点云中查找障碍物。 从自动驾驶汽车的角度看激光雷达…

【Java】电子病历编辑器源码(云端SaaS服务)

电子病历编辑器极具灵活性&#xff0c;它既可嵌入到医院HIS系统中&#xff0c;作为内置编辑工具供多个模块使用&#xff0c;也可以独立拿出来&#xff0c;与第三方业务厂商展开合作&#xff0c;为他们提供病历书写功能&#xff0c;充分发挥编辑器的功能。 电子病历基于云端SaaS…

8、电路综合-基于简化实频的SRFT微带线的带通滤波器设计

8、电路综合-基于简化实频的SRFT微带线的带通滤波器设计 此处介绍微带线综合的巴特沃斯带通滤波器和切比雪夫带通滤波器的设计方法。对于理查德域的网络综合技术而言&#xff0c;这种带通综合和低通综合在本质上并无区别&#xff0c;因为理查德域函数是周期的。低通滤波器的SR…