RabbitMQ五大常用工作模式

1.简单队列

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

  • 消息生产者

    public class Send {private static final String QUEUE_NAME = "hello";public static void main(String[] args) throws Exception {// 连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.101.128");factory.setPort(5672);factory.setUsername("admin");factory.setPassword("Gen123");factory.setVirtualHost("/");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);String message = "这是一条消息!!!";// 发送消息channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));System.out.println("Send:" + message);}}
    }
    
  • 消息消费者(会一直监听队列)

    public class Recv {private static final String QUEUE_NAME = "hello";public static void main(String[] args) throws Exception {// 连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.101.128");factory.setPort(5672);factory.setUsername("admin");factory.setPassword("Gen123");factory.setVirtualHost("/");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 回调DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println("Recv:" + message);};// 自动确认消息channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});}
    }
    

2.工作队列

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

  • 工作队列

    • 消息生产能力大于消费能力,增加多个消费节点
    • 和简单队列类似,增加多个消费节点,处于竞争关系
    • 默认策略:round robin轮训
  • 生产者

    public class Send {private static final String QUEUE_NAME = "work_mq";public static void main(String[] args) throws Exception {// 连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.101.128");factory.setPort(5672);factory.setUsername("admin");factory.setPassword("Gen123");factory.setVirtualHost("/");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 批量发送10个消息for (int i = 0; i < 10; i++) {String message = "这是一条消息!!!" + i;// 发送消息channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8));System.out.println("Send:" + message);}}}
    }
    
  • 消费者1

    public class Recv1 {private static final String QUEUE_NAME = "work_mq";public static void main(String[] args) throws Exception {// 连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.101.128");factory.setPort(5672);factory.setUsername("admin");factory.setPassword("Gen123");factory.setVirtualHost("/");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 回调DeliverCallback deliverCallback = (consumerTag, delivery) -> {// 模拟消费者缓慢try {TimeUnit.SECONDS.sleep(3);} catch (InterruptedException e) {throw new RuntimeException(e);}String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println("Recv1:" + message);// 手工确认消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);};// 关闭自动确认消息channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});}
    }
    
  • 消费者2

    public class Recv2 {private static final String QUEUE_NAME = "work_mq";public static void main(String[] args) throws Exception {// 连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.101.128");factory.setPort(5672);factory.setUsername("admin");factory.setPassword("Gen123");factory.setVirtualHost("/");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 声明队列channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 回调DeliverCallback deliverCallback = (consumerTag, delivery) -> {// 模拟消费者缓慢try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {throw new RuntimeException(e);}String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println("Recv2:" + message);// 手工确认消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);};// 关闭自动确认消息channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> {});}
    }
    
  • 轮训策略验证

    • 先启动两个消费者,再启动生产者
    • 缺点:存在部分节点消费过快,部分节点消费慢,导致不能合理处理消息
  • 公平策略验证

    • 修改消费者策略
    • 解决消费者能力消费不足的问题,降低消费时间问题

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

3.RabbitMQ的Exchange交换机

  • 生产者将消息发送到Exchange,交换机将消息路由到一个或者多个队列中,交换机有多个类型,队列和交换机是多对多的关系
  • 交换机只负责转发消息,不具备存储消息的能力,如果没有队列和交换机绑定或者没有符合的路由规则,则消息会被丢失
  • RabbitMQ有四种交换机类型,分别是Direct exchange、Fanout exchange、Topic exchange、Headers exchange,最后一种基本不用

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

  • 交换机类型
    • Direct exchange定向
      • 将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配
      • 处理路由键
    • Fanout exchange广播
      • 只需要简单的将队列绑定到交换机上,一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息
      • Fanout交换机转发消息是最快的,用于发布订阅广播形式
      • 不处理路由键
    • Topic exchange通配符
      • 主题交换机是一种发布/订阅的模式,结合了直连交换机与扇形交换机的特点
      • 将路由键和某模式进行匹配,此时队列需要绑定在一个模式上
      • 符号"#“匹配一个或多个词,符号”*"匹配不多不少一个词
    • Headers exchange(很少用)
      • 根据发送的消息内容中的headers属性进行匹配,在绑定Queue与Exchange时指定一组键值对
      • 当消息发送到RabbitMQ时会取到该消息的headers与Exchange绑定时指定的键值对进行匹配
      • 如果完全匹配则消息会路由到该队列,否则不会路由到该队列
      • 不处理路由键

4.发布订阅模型

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

  • 什么是RabbitMQ的发布订阅模式

    • 发布订阅模型中,消息生产者不再是直接面对队列,而是直面交换机,都需要经过交换机来进行消息的发送,所有发往同一个fanout交换机的消息都会被所有监听这个交换机的消费者接收
    • 发布订阅模型引入fanout交换机
  • 发布订阅模型应用场景

    • 微信公众号
    • 新浪微博关注
  • RabbitMQ发布订阅模型

    • 通过把消息发送给交换机,交换机转发给对应绑定的队列
    • 交换机绑定的队列是排他独占队列,自动删除
  • 发送端

    public class Send {private static final String EXCHANGE_NAME = "fan_mq";public static void main(String[] args) throws Exception {// 连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.101.128");factory.setPort(5672);factory.setUsername("admin");factory.setPassword("Gen123");factory.setVirtualHost("/");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 绑定交换机,广播类型channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);String message = "广播发送消息:这是一条消息!!!";// 发送消息channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes(StandardCharsets.UTF_8));System.out.println("Send:" + message);}}
    }
    
  • 消费端(两个节点)

    public class Recv1 {private static final String EXCHANGE_NAME = "fan_mq";public static void main(String[] args) throws Exception {// 连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.101.128");factory.setPort(5672);factory.setUsername("admin");factory.setPassword("Gen123");factory.setVirtualHost("/");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 绑定交换机,广播类型channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);// 获取队列(排它队列)String queueName = channel.queueDeclare().getQueue();// 绑定队列和交换机channel.queueBind(queueName, EXCHANGE_NAME, "");// 回调DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println("Recv1:" + message);};// 自动确认消息channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});}
    }
    

5.路由模式

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

  • 什么是RabbitMQ的路由模式

    • 交换机类型是direct

    • 队列和交换机绑定,需要指定一个路由键(也叫binding key)

    • 消息生产者发送消息给交换机,需要指定路由键

    • 交换机根据消息的路由键,转发给对应的队列

  • 消息生产者

    public class Send {private static final String EXCHANGE_NAME = "direct_mq";public static void main(String[] args) throws Exception {// 连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.101.128");factory.setPort(5672);factory.setUsername("admin");factory.setPassword("Gen123");factory.setVirtualHost("/");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 绑定交换机,直连类型channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);String error = "我是错误日志";String info = "我是info日志";String warning = "我是warning日志";// 发送消息channel.basicPublish(EXCHANGE_NAME, "error", null, error.getBytes(StandardCharsets.UTF_8));channel.basicPublish(EXCHANGE_NAME, "info", null, info.getBytes(StandardCharsets.UTF_8));channel.basicPublish(EXCHANGE_NAME, "warning", null, warning.getBytes(StandardCharsets.UTF_8));System.out.println("Send:消息发送成功!");}}
    }
    
  • 消费者一(只接收错误消息)

    public class Recv1 {private static final String EXCHANGE_NAME = "direct_mq";public static void main(String[] args) throws Exception {// 连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.101.128");factory.setPort(5672);factory.setUsername("admin");factory.setPassword("Gen123");factory.setVirtualHost("/");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 绑定交换机,直连类型channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);// 获取队列String queueName = channel.queueDeclare().getQueue();// 绑定队列和交换机channel.queueBind(queueName, EXCHANGE_NAME, "error");// 回调DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println("Recv1:" + message);};// 自动确认消息channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});}
    }
    
  • 消费者二(接收全部消息)

    public class Recv2 {private static final String EXCHANGE_NAME = "direct_mq";public static void main(String[] args) throws Exception {// 连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.101.128");factory.setPort(5672);factory.setUsername("admin");factory.setPassword("Gen123");factory.setVirtualHost("/");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 绑定交换机,直连类型channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);// 获取队列String queueName = channel.queueDeclare().getQueue();// 绑定队列和交换机channel.queueBind(queueName, EXCHANGE_NAME, "error");channel.queueBind(queueName, EXCHANGE_NAME, "info");channel.queueBind(queueName, EXCHANGE_NAME, "warning");// 回调DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println("Recv2:" + message);};// 自动确认消息channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});}
    }
    

6.主题通配符模式

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

  • 什么是RabbitMQ的主题模式

    • 交换机是topic,可以实现发布订阅模式fanout和路由模式direct的功能,更加灵活,支持通配符匹配
    • 交换机通过通配符进行转发到对应的队列,*代表一个词,#代表1个或多个词,一般用#作为通配符居多,词与词之间使用.点进行分割
    • 注意:交换机和队列绑定时用的binding使用通配符的路由键;生产者发送消息时需要使用具体的路由键
  • 生产者

    public class Send {private static final String EXCHANGE_NAME = "topic_mq";public static void main(String[] args) throws Exception {// 连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.101.128");factory.setPort(5672);factory.setUsername("admin");factory.setPassword("Gen123");factory.setVirtualHost("/");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {// 绑定交换机,主题类型channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);String error = "我是错误日志";String info = "我是info日志";String warning = "我是warning日志";// 发送消息channel.basicPublish(EXCHANGE_NAME, "error", null, error.getBytes(StandardCharsets.UTF_8));channel.basicPublish(EXCHANGE_NAME, "info", null, info.getBytes(StandardCharsets.UTF_8));channel.basicPublish(EXCHANGE_NAME, "warning", null, warning.getBytes(StandardCharsets.UTF_8));System.out.println("Send:消息发送成功!");}}
    }
    
  • 消费者一(只接收错误消息)

    public class Recv1 {private static final String EXCHANGE_NAME = "topic_mq";public static void main(String[] args) throws Exception {// 连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.101.128");factory.setPort(5672);factory.setUsername("admin");factory.setPassword("Gen123");factory.setVirtualHost("/");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 绑定交换机,主题类型channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);// 获取队列String queueName = channel.queueDeclare().getQueue();// 绑定队列和交换机channel.queueBind(queueName, EXCHANGE_NAME, "error");// 回调DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println("Recv1:" + message);};// 自动确认消息channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});}
    }
    
  • 消费者二(接收全部消息)

    public class Recv2 {private static final String EXCHANGE_NAME = "topic_mq";public static void main(String[] args) throws Exception {// 连接工厂ConnectionFactory factory = new ConnectionFactory();factory.setHost("192.168.101.128");factory.setPort(5672);factory.setUsername("admin");factory.setPassword("Gen123");factory.setVirtualHost("/");Connection connection = factory.newConnection();Channel channel = connection.createChannel();// 绑定交换机,主题类型channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);// 获取队列String queueName = channel.queueDeclare().getQueue();// 绑定队列和交换机channel.queueBind(queueName, EXCHANGE_NAME, "#");// 回调DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println("Recv2:" + message);};// 自动确认消息channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {});}
    }
    

7.工作模式总结

  • 简单模式

    • 一个生产者一个消费者,不用指定交换机,使用默认交换机
  • 工作队列模式

    • 一个生产者多个消费者,可以有轮训和公平策略,不用指定交换机,使用默认交换机
  • 发布订阅模式

    • fanout类型交换机,通过交换机和队列绑定,不用指定绑定路由键,生产者发送消息到交换机,fanout交换机直接进行转发,消息不用指定routingkey路由键
  • 路由模式

    • direct类型交换机,通过交换机和队列绑定,指定绑定的路由键,生产者发送消息到交换机,交换机根据消息的路由key进行转发到对应的队列,消息要指定routingkey路由键
  • 通配符模式

    • topic交换机,通过交换机和队列绑定,指定绑定的通配符路由键,生产者发送消息到交换机,交换机根据消息的路由键进行转发到对应的队列,消息要指定routingkey路由键

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/474976.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

谷达冠楠电商:现在开网店能赚钱吗

随着互联网技术的迅猛发展&#xff0c;电子商务已成为现代商业的重要组成部分。许多人纷纷涉足网店经营&#xff0c;希望通过线上渠道实现创业梦想。然而&#xff0c;“现在开网店能赚钱吗?”这个问题的答案并不是绝对的&#xff0c;而是取决于多种因素。 网络市场的低门槛和广…

软考高项总结:第8章整合管理

一、管理基础 1、项目整合管理由项目经理负责,项目经理负责整合所有其他知识领域的成果,并掌握项目总体情况。项目整合管理的责任不能被授权或转移,项目经理必须对整个项目承担最终责任。整合是项目经理的一项关键技能。执行项目整合时项目经理承担双重角色: (1)组织层…

C++智能指针的知识!

个人主页&#xff1a;PingdiGuo_guo 收录专栏&#xff1a;C干货专栏 大家好呀&#xff0c;我是PingdiGuo_guo&#xff0c;今天我们来学习一下智能指针。 文章目录 1.智能指针的概念 2.智能指针的思想 3.智能指针的作用 3.1 自动内存管理 3.2 共享所有权 3.3 避免悬挂指针…

【开源】SpringBoot框架开发校园疫情防控管理系统

目录 一、摘要1.1 项目介绍1.2 项目录屏 二、功能模块2.1 学生2.2 老师2.3 学校管理部门 三、系统展示四、核心代码4.1 新增健康情况上报4.2 查询健康咨询4.3 新增离返校申请4.4 查询防疫物资4.5 查询防控宣传数据 五、免责说明 一、摘要 1.1 项目介绍 基于JAVAVueSpringBoot…

React 更改程序入口点(index.js文件位置变更)

食用前提示&#xff1a;本文基于已经快速配置好的React环境而作&#xff0c;配置React环境详见拙作&#xff1a;React环境配置-CSDN博客~ 一、了解默认入口点 使用create-react-app快速搭建react环境后&#xff0c;npm start启动程序的默认入口点为/src/index(即src目录下的ind…

ChatGPT的大致原理

国外有个博主写了一篇博文&#xff0c;名字叫TChatGPT: Explained to KidsQ」&#xff0c; 直译过来就是&#xff0c;给小孩子解释什么是ChatGPT。 因为现实是很多的小孩子已经可以用父母的手机版ChatGPT玩了 &#xff0c;ChatGPT几乎可以算得上无所不知&#xff0c;起码给小孩…

Linux——信号(1)

在我们使用Linux系统的时候我们经常会使用ctrl c的方式来终止进程&#xff0c;也 会使用kill命令来杀掉进程&#xff0c;评判进程退出的健康程度中也有信号的身影。那 么Linux中的信号到底是什么&#xff1f;今天就由我来介绍Linux中的信号。1. 信号的概念 要了解计算机中的信…

一.重新回炉spring框架: 理解Spring IoC

1. 写在前面的话 说实话&#xff0c;重试java开发工作时间也不短了&#xff0c;对于spring框架&#xff0c;也是天天用&#xff0c;这期间也碰到了很多问题&#xff0c;也解决了很多问题。可是&#xff0c;总感觉对spring还是一知半解&#xff0c;不能有个更加全面的理解。既然…

C++初阶:容器适配器介绍、stack和queue常用接口详解及模拟实现

介绍完了list类的相关内容后&#xff1a;C初阶&#xff1a;适合新手的手撕list&#xff08;模拟实现list&#xff09; 接下来进入新的篇章&#xff0c;stack和queue的介绍以及模拟&#xff1a; 文章目录 1.stack的初步介绍2.stack的使用3.queue的初步介绍4.queue的使用5.容器适…

SpringBoot实现OneDrive文件上传

SpringBoot实现OneDrive文件上传 源码 OneDriveUpload: SpringBoot实现OneDrive文件上传 获取accessToken步骤 参考文档&#xff1a;针对 OneDrive API 的 Microsoft 帐户授权 - OneDrive dev center | Microsoft Learn 1.访问Azure创建应用Microsoft Azure&#xff0c;使…

阿里云香港服务器cn2速度测试和租用价格表

阿里云香港服务器中国香港数据中心网络线路类型BGP多线精品&#xff0c;中国电信CN2高速网络高质量、大规格BGP带宽&#xff0c;运营商精品公网直连中国内地&#xff0c;时延更低&#xff0c;优化海外回中国内地流量的公网线路&#xff0c;可以提高国际业务访问质量。阿里云服务…

百度智能云分布式数据库 GaiaDB-X 与龙芯平台完成兼容认证

近日&#xff0c;百度智能云的分布式关系型数据库软件 V3.0 与龙芯中科技术股份有限公司的龙芯 3C5000L/3C5000 处理器平台完成兼容性测试&#xff0c;功能与稳定性良好&#xff0c;获得了龙架构兼容互认证证书。 龙芯系列处理器 通用 CPU 处理器是信息产业的基础部件&#xf…