RabbitMQ灵活运用,怎么理解五种消息模型

RabbitMQ灵活运用,怎么理解五种消息模型

  • 简介
  • 一、AMQP协议
  • 二、交换机类型与默认交换机
    • 1. 交换机的四种类型
    • 2. 默认交换机
  • 三、五种模式速览
    • 1. 一对一简单模式
    • 2. work模式(轮询)
    • 3. 发布/订阅模式
    • 4. 路由模式(自称direct模式)
    • 5. Topic模式
  • 四、实例
    • 1. 生产者代码
    • 2. 消费者代码
  • 五、总结


简介

上次我们介绍了,为什么rabbitMQ会被很多人中意选型,从而成为火热的MQ组件,今天就先来说一说MQ的基础使用 ———— 其五种消息模型


一、AMQP协议

我们都知道,RabbitMQ是一个使用Erlang语言,基于AMQP协议的MQ组件,那什么是AMQP协议呢,我们就从这开始今天的学习。

AMQP全称为 Advanced Message Queuing Protocol(高级消息队列协议),是一个面向消息的中间件传输协议,用于在应用程序之间进行异步消息通信。

AMQP协议定义了多种角色和服务,包括生产者、消费者、交换器、队列等。其中生产者负责生成消息,消费者负责接收和处理消息,交换器则负责将消息路由到队列中。如下图
在这里插入图片描述

AMQP协议的消息传输基于消息队列,支持消息的确认、持久化、事务等功能,同时支持不同的消息传输模式,如点对点(point-to-point)和发布-订阅(publish-subscribe)模式。

我们今天要介绍的RabbitMQ的五种模型,其实都是基于AMQP的基础模型或其演变而来

二、交换机类型与默认交换机

1. 交换机的四种类型

在进一步讲解之前,我们需要知道RabbitMQ,交换机有四种类型,分别是:Direct、Fanout、Topic和Headers。

  • direct:
    按照消息的路由键(routing key)完全匹配来投递消息。直接匹配模式,将消息发送到与路由键完全匹配的队列中。direct模式可以使用RabbitMQ自带的默认交换机,所以不需要将交换机进行任何绑定操作

  • topic:
    使用通配符进行模糊匹配,消息会带有一个路由键(routing key),而队列绑定到交换机上时,也可以指定主题,而且还能使用一定的正则形式,只要主题匹配上消息的路由键,该消息就会发送至该队列
    符号“#”匹配一个或多个词 eg:" log.# "能够匹配到‘ log.info.oa ’
    符号“ * ” 匹配不多不少一个词 eg:“ log.* ”只能匹配到“log.erro”

  • headers:
    按照消息头(header)匹配来投递消息。根据消息头的键值对匹配,当消息头与某个绑定的headers完全匹配时,才会将消息发送到该队列中。

  • fanout :
    将接收到的所有消息广播到它知道的所有队列中,如果routing_key 有指定也不会生效

不难发现,这四种类型本质上是告诉交换机应该把消息发送给哪些那些队列的,四种类别对应着四种判断角度。fanout —— 相当于广播,不作任何选择,发送给所有连接的队列;direct —— 消息发送时都附带一个字段叫routing_key,direct 模式的交换机就会直接把该字段理解成队列名,找到对应的队列并发送;topic —— 交换机会把routing_key理解成一个主题,恰好,队列绑定交换机时也可以以缩略形式指定主题,所以找到匹配主题的队列就发送;headers —— 这种模式的交换机不再以消息的routing_key作为判断依据,而是在队列绑定交换机时,每个队列需提供一个Map,当消息发送给交换机时,交换机会解析消息头,看看有没有能和各队列Map吻合的属性,有则发给该队列。

2. 默认交换机

RabbitMQ有一个自带的交换机,也被称为AMQP default exchange。当消息发送到RabbitMQ时,如果没有指定交换机,就会被发送到默认交换机。默认交换机的类型为direct类型,路由键与队列名相同

如果消息的路由键和某个队列的名称一致,那么消息就会被发送到这个队列中。如果消息的路由键和任何一个队列的名称都不一致,那么消息会被丢弃。默认交换机可以通过设置routing_key来指定消息的目的地,例如:

//  将消息发送到名称为test_queue的队列中,空字符串代表默认交换机
channel.basic_publish(exchange="", routing_key="test_queue", body="Hello, RabbitMQ!")

但是,建议应用程序在发送消息时显式地指定交换机,以避免不必要的麻烦或错误。默认交换机只是一个简单的机制,不应被用于复杂的应用程序。

三、五种模式速览

1. 一对一简单模式

在这里插入图片描述

  • 概念
    生产者将消息发送到“ hello”队列。使用者从该队列接收消息。

  • 图解
    “ P”是我们的生产者
    “ C”是我们的消费者
    中间的框是一个队列-RabbitMQ代表使用者保留的消息缓冲区。需要注意的是,虽然看起来生产者直接连接了队列,但是实际上,它连接的是rabbitMQ的默认交换机

在这里插入图片描述


2. work模式(轮询)

在这里插入图片描述

  • 概念
    work模式是一个生产者,一个队列,多个消费者,每个消费者获取到的消息唯一,多个消费者只有一个队列,C1\C2是竞争关系,一个消息被C1消费,C2将没有消息

  • 优点及作用
    一个生产者一个队列多个消费者模式能够并行化工作,解决了消息积压问题

  • 工作原理
    默认情况下,RabbitMQ将每个消息按顺序发送给下一个使用者。平均而言,每个消费者都会收到相同数量的消息。这种分发消息的方式称为循环。

  • 注意
    轮询分发采用平均,导致机器性能的浪费,可以将消费者信道设置如下,代表不公平分发

int preFetchCount = 1;
// 该参数的意思是消费者在接收到队列里的消息但没有返回确认结果之前,队列不会将新的消息分发给该消费者,该设置需要和手动ACK配合使用
channel.basicQos(preFetchCount )


3. 发布/订阅模式

前两种发布模式,在图中都没有画出 Exchange交换机,属于是一种模型的简化,实际上上两种模式,都将消息发布给了rabbitMQz自带的默认交换机,然后默认交换机再将消息转发给消息队列
(PS:每个新建队列(queue)都会自动绑定到默认交换机上,绑定的路由键(routing key)名称与队列名称相同)
在这里插入图片描述

  • 概念
    和模式二不同的是,模式三是没有路由规则,多个队列,多个消费者生产者将消息不是直接发送到队列,而是发送到X交换机,然后由交换机发送给两个队列,两个消费者各自监听一个队列,因此一个消息可以被多个消费者消费

4. 路由模式(自称direct模式)

在这里插入图片描述

  • 概念
    交换机连接接队列发送消息需要指定routing_key,所以模式四就是生产者发送消息到交换机并且要指定routing_key,消费者将队列绑定到交换机时需要指定路由key
  • 算法
    背后的路由算法很简单:消息自带一个routing_key(亦称路由键),交换机会把该消息路由给与其routing_key完全匹配的队列

在这里插入图片描述
在direct模式中,第一个队列以 orange为 key 绑定至交换机 x,第二个队列以black和green绑定。注意:如果有消息没有使用上述key,比如某个消息的key 是 red ,那么该消息将会被丢弃

另外,用相同的路由键可以绑定多个队列
在这里插入图片描述


5. Topic模式

在这里插入图片描述

  • 概念
    topic模式的routing_key使用通配符组成进行模糊匹配

符号“#”匹配一个或多个词 eg:" log.# "能够匹配到‘ log.info.oa ’
符号“ * ” 只匹配一个词 eg:“ log.* ”只能匹配到“log.erro”

举例说明,当使用下列 routing_key 发送消息时:

  1. “quick.orange.rabbit”的消息将传递到两个队列;
  2. “lazy.orange.elephant ”也将发送给他们两个;
  3. “quick.orange.fox ”只会进入Q1,而“ lazy.brown.fox ”只会进入Q2;
  4. “lazy.pink.rabbit ”将被传递到Q2只有一次,即使两个绑定都匹配;
  5. “quick.brown.fox ”与任何绑定都不匹配,因此将被丢弃;
  6. “orange“ 和“”quick.orange.male.rabbit“,因为单词个数对不上,则不会被匹配
  7. “lazy.orange.male.rabbit ”即使有四个单词,也将匹配最后一个绑定,并将其传送到Q2队列。

四、实例

我们以最复杂的Topic模式为例,实际写一段java代码

1. 生产者代码

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;public class TopicProducer {private static final String EXCHANGE_NAME = "topic_exchange";public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setUsername("guest");factory.setPassword("guest");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 创建Topic类型的交换机channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);// 待发送的消息String routingKey = "topic.key1";String message = "hello rabbitmq, this is topic message";// 发送消息channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());System.out.println("Sent message: " + message + ", routingKey: " + routingKey);channel.close();connection.close();}
}

2. 消费者代码

import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;public class TopicConsumer {private static final String EXCHANGE_NAME = "topic_exchange";public static void main(String[] args) throws IOException, TimeoutException {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");factory.setUsername("guest");factory.setPassword("guest");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 创建Topic类型的交换机,由于发送者已创建,此步其实可省略,一般由发送方建立交换机// channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);// 创建一个非持久、独占、自动删除的队列String queueName = channel.queueDeclare().getQueue();// 绑定路由键为 "topic.#" 的消息channel.queueBind(queueName, EXCHANGE_NAME, "topic.#");System.out.println("Waiting for messages. To exit press CTRL+C");// 消费消息Consumer consumer = new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {String routingKey = envelope.getRoutingKey();String message = new String(body, "UTF-8");System.out.println("Received message: " + message + ", routingKey: " + routingKey);}};channel.basicConsume(queueName, true, consumer);}
}

五、总结

rabbitMQ的五种模式就介绍完了,其实后三种模式非常相像,我们可以这么理解Topic模式

当队列用‘#’绑定时它将接收所有消息,与routing_key无关,此时就像fanout模式一样
当队列绑定中不使用‘*’和‘#’时,主题交换就像direct模式一样。

而至于前两种模式,由于模型中不带有交换机,所以生产者和消费者都是直接连接的Queue,则就是直肠子的模式

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/1100.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Redis 简介与数据类型介绍

目录 ​编辑 一、Redis是什么? 二、redis五大基本类型 2.1 String(字符串) 2.1.1 应用场景 1)缓存功能 2)计数器 3)统计多单位的数量 4)共享用户session 2.2 List(列表) 2.2.1 应用场景 1)消息队列 2…

Python学习—装饰器的力量

Python学习—装饰器的力量 作为许多语言都存在的高级语法之一,装饰器是你必须掌握的知识点。 Python的装饰器(Decorator)允许你扩展和修改可调用对象(函数、方法和类)的行为,而无需永久修改可调用的对象本身…

蓝桥杯专题-试题版-【龟兔赛跑预测】【回形取数】【阶乘计算】【矩形面积交】

点击跳转专栏>Unity3D特效百例点击跳转专栏>案例项目实战源码点击跳转专栏>游戏脚本-辅助自动化点击跳转专栏>Android控件全解手册点击跳转专栏>Scratch编程案例点击跳转>软考全系列点击跳转>蓝桥系列 👉关于作者 专注于Android/Unity和各种游…

高性能消息中间件 RabbitMQ

一、RabbitMQ概念 1.1 MQ是什么 消息队列 MQ全称Message Queue(消息队列),是在消息的传输过程中保存消息的容器。多用于系统之间的异步通信。 同步通信相当于两个人当面对话,你一言我一语。必须及时回复: 异步通信相…

Spring是什么?

目录 1、Spring的简介 2、Spring七大功能模块 3、Spring的优点 4、Spring的缺点 5、Sprig容器 6、Spring的生态圈(重点)***** 7、Spring中bean的生命周期 1、Spring的简介 Spring的英文翻译为春天,可以说是给Java程序员带来了春天&…

chatgpt赋能python:关于Python除二取余法的优缺点分析

关于Python除二取余法的优缺点分析 Python是当前数据分析和科学计算最火热的语言之一,其中除二取余法是Python中很有趣的算法之一。它也是很常用的基础算法之一,特别是在图像处理和编码中,非常常用。除二取余法指的是一个数值除以二后的余数…

虚拟文件系统的数据结构

文章目录 虚拟文件系统的数据结构超级快挂载描述符文件系统类型索引节点目录项文件的打开实例和打开文件表 虚拟文件系统的数据结构 虽然不同文件系统类型的物理结构不同,但是虚拟文件系统定义了一套统一的数据结构。 (1)超级块。文件系统的…

Flutter Ping 检查服务器通讯信号强度

Flutter Ping 检查服务器通讯信号强度 前言 对通讯敏感的程序中,我们除了检查当前网络通道外,还要检查与服务器实际的型号强度。 一般我们采用 ping 的方式返回型号的强度和稳定程度。 dart_ping 包 https://pub-web.flutter-io.cn/packages/dart_ping …

Debezium系列之:监控 Debezium 实例

Debezium系列之:监控 Debezium 实例 一、概述二、实现步骤三、执行四、打开Grafana UI五、关闭集群 Debezium JMX相关的技术博客: Debezium系列之:安装jmx导出器监控debezium指标Debezium系列之:为Debezium集群JMX页面增加监控&a…

「一本通 3.2 练习 6」汽车加油行驶

目录 第一步,二维转一维(此步仅为方便,可以省略) 第二步,建边(啥都行,只要死不了) 第三部,bfs(你要dfs也行) 第一步 第二步 第三步 可CA呢…

fatal error: ‘type_traits‘ file not found错误解决

错误如下 In file included from ../test_opencv_qt/main.cpp:1: In file included from ../../Qt/6.5.1/android_x86_64/include/QtGui/QGuiApplication:1: In file included from ../../Qt/6.5.1/android_x86_64/include/QtGui/qguiapplication.h:7: In file included from .…

在?聊聊浏览器事件循环机制

目录 前言 同步/异步编程模型 同步 异步 JS异步模型 调用栈 任务队列 宏任务队列 微任务队列 微任务API 事件循环 队列优先级 混合队列 事件循环实现 总结 参考文章 Event-Loop可视化工具 前言 JS是单线程语言,在某个时间段只能执行一段代码。这…