tp8 使用rabbitMQ(3)发布/订阅

发布/订阅

当我们想把一个消息,发送给 多个消费者的时候,我们把这种模式叫做发布/订阅模式,比如我们做两个消费者,其中一个消费者把消息写入磁盘中,别一个消费者把消息结果输出到屏幕上,就要用到发布订阅模式

  • 发布者(producer)是发布消息的应用程序。
  • 队列(queue)用于消息存储的缓冲。
  • 消费者(consumer)是接收消息的应用程序。

下面的图才是 rabbitmq 的完整模式, 中间是有交换机的
在这里插入图片描述

交换机

RabbitMQ消息模型的核心理念是:发布者(producer)不会直接发送任何消息给队列。事实上,发布者(producer)甚至不知道消息是否已经被投递到队列。
发布者(producer)只需要把消息发送给一个交换机(exchange)。交换机非常简单,它一边从发布者方接收消息,一边把消息推送到队列。交换机必须知道如何处理它接收到的消息,是应该推送到指定的队列还是是多个队列,或者是直接忽略消息。这些规则是通过交换机类型(exchange type)来定义的。

我们之前的 简单队列和工作队列中,没有提来交换机的概念。

默认交换机

当我们使用RabbitMQ时,如果不指定交换机的类型,那么Rabbit会使用默认的一个交换机,这个默认的交换机类型是一个直连交换机(direct),后续新建的队列(queue)都会自动绑定到这个默认交换机上,绑定的路由键就是队列的名称,注意这个默认交换机的名称是一个空字符串 " "

交换机的种类有多种

直连交换机(direct), 主题交换机(topic), (头交换机)headers和 扇型交换机(fanout)
头交换机的性能不好, 基本不用
用的最多的还是 扇形交换机 相当于是广播
前面两节中,我们只使用了下面的代码,其实是使用的默认交换机,没有定义,直接使用了

$channel->basic_publish($msg, '', 'hello');

发布订阅模式中,我们使用 扇形交换机 fanout 代码如下

$channel->exchange_declare('logs', 'fanout', false, false, false);
$channel->basic_publish($msg, 'logs');

上面两段代码比较,第一段,因为使用了默认的交换机,所以没有交换机的定义语句, 但是在发布的时候,中间那个参数是 “”,这样就指定了默认交换机的名称, 第二段,我们指明了要使用的交换机 fanout 所以在发布的时候,使用的是自定义的交换机名称


因为有了交换机,生产者代码中只需要把 message 发送给交换机就可以了, 所以生产者中不需要创建队列,创建队列放到 消费者中就可以了,(如果我们一定要把创建队列的时机放在生产者中,也是可以的, 个人根据需要灵活应用)

交换机和队列的绑定(这里应该是在消费者代码中出现的)

我们创建了交换机,并且有了N个队列,它们之间要建立绑定关系,才可以分发到相应有绑定的队列中

$channel->queue_bind($queue_name, 'hello');  //这样就把队列名称和交换机名称做了绑定

下面的 完整的代码示例

生产者

<?php
declare (strict_types = 1);namespace app\command;use ba\Exception;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;class PubSubMQProduce extends Command
{protected function configure(){// 指令配置$this->setName('pubsubmqproduce')->setDescription('发布订阅模式的生产者');}protected function execute(Input $input, Output $output){//获取连接$connection = $this->getConnectRabbitMQ();//创建通道$channel = $connection->channel();//创建交换机/*** params exchange  自定义交换机名称* params type  交换机的类型, 一般都会使用 扇形(fanout)* params passive 是否消极声名* params durable 是否持久化* params auto_delete 是否自动删除* params internal 设置是否内置的, true表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中, 只能通过交换器路由到交换器这个方式* params  nowait 相当于做一个异步版的声明,不等待返回,就让程序继续执行*/$channel->exchange_declare("exchangeName","fanout",false,false,false,false,false);//现在生产者只需要把消息发给交换机就可以了,所以不用在生产者中创建队列了(当然,想创建也是可以的)for ($i = 0; $i < 20; $i++) {$msgArr = ["name"=>"haha".$i,"age"=>'10'.$i,"sex"=>"female".$i];$msg = new AMQPMessage(json_encode($msgArr),["delivery_mode"=>AMQPMessage::DELIVERY_MODE_PERSISTENT]);sleep(1);$channel->basic_publish($msg,"exchangeName");}$channel->close();$connection->close();}protected function getConnectRabbitMQ(){try{$connection = new AMQPStreamConnection("192.168.3.228",'5672',"admin","123456");return $connection;}catch(Exception $e){throw new Exception("队列连接失败");}}
}

消费者

<?php
declare (strict_types = 1);namespace app\command;use ba\Exception;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use think\console\Command;
use think\console\Input;
use think\console\input\Argument;
use think\console\input\Option;
use think\console\Output;class PubSubMQConsumer extends Command
{protected function configure(){// 指令配置$this->setName('pubsubmqconsumer')->setDescription('发布订阅模式的消费者');}protected function execute(Input $input, Output $output){$connection = $this->connectRabbitMQ();$channel = $connection->channel();//创建两个队列$channel->queue_declare("queueName1",false,false,false,false,false);$channel->queue_declare("queueName2",false,false,false,false,false);//绑定交换机和队列,交换机的名称是在生产者中定义的$channel->queue_bind("queueName1","exchangeName");$channel->queue_bind("queueName2","exchangeName");//设置消息处理函数$callback1 = function($msg){$msgArr = json_decode($msg->body,true);echo "这是(显示)处理数据的队列NO1  ".$msgArr["name"]."-11-".$msgArr["age"]."-11-".$msgArr["sex"].PHP_EOL;$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);   //这里让就是消息的应答了};$callback2 = function($msg){$msgArr = json_decode($msg->body,true);echo "这是(保存)处理数据的队列NO2  ".$msgArr["name"]."-22-".$msgArr["age"]."-22-".$msgArr["sex"].PHP_EOL;$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);   //这里让就是消息的应答了};$channel->basic_consume("queueName1","",false,false,false,false,$callback1);$channel->basic_consume("queueName2","",false,false,false,false,$callback2);while(count($channel->callbacks)){$channel->wait();}}protected function connectRabbitMQ(){try{$connection = new AMQPStreamConnection("192.168.3.228",'5672',"admin","123456");return $connection;}catch(Exception $e){throw new Exception("队列连接失败");}}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/214908.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

井盖位移传感器生产厂家推荐,时刻感知井盖

马路上的井盖虽然看似微不足道&#xff0c;但实际上对于行人的“脚下安全”起着至关重要的作用。这些井盖下连接着供排水、燃气、电力、供热、通信等功能的管路和线路&#xff0c;是城市生命线运行的重要保障。因此保持井盖状态正常、明确管理责任是确保车辆和行人安全通行的重…

电源控制系统架构(PCSA)之系统分区电压域

目录 4.1 电压域 4.1.1 系统逻辑 4.1.2 Always-On逻辑 4.1.3 处理器Clusters 4.1.4 图形处理器 4.1.5 其他功能 4.1.6 SoC分区示例 本章描述基于Arm组件的SoC划分为电压域和电源域。 所描述的选择并不详尽&#xff0c;只是可能性的一个子集。目的是描述基于Arm组件的SoC…

重生之我是一名程序员 37 ——C语言中的栈溢出问题

哈喽啊大家晚上好&#xff01; 今天呢给大家带来一个烧脑的知识——C语言中的栈溢出问题。那什么是栈溢出呢&#xff1f;栈溢出指的是当程序在执行函数调用时&#xff0c;为了保护函数的局部变量和返回地址&#xff0c;将这些数据存储在栈中。如果函数在函数调用时使用了过多的…

微服务实战系列之签名Sign

前言 昨日恰逢“小雪”节气&#xff0c;今日寒风如约而至。清晨的马路上&#xff0c;除了洋洋洒洒的落叶&#xff0c;就是熙熙攘攘的上班族。眼看着&#xff0c;暖冬愈明显了&#xff0c;叶子来不及泛黄就告别了树。变化总是在不经意中发生&#xff0c;容不得半刻糊涂。 上集博…

实现HTTP服务监听,快来试试springboot服务端接口公网远程调试

&#x1f308;个人主页&#xff1a;聆风吟 &#x1f525;系列专栏&#xff1a;网络奇遇记、Cpolar杂谈 &#x1f516;少年有梦不应止于心动&#xff0c;更要付诸行动。 文章目录 &#x1f4cb;前言一. 本地环境搭建1.1 环境参数1.2 搭建springboot服务项目 二. 内网穿透2.1 安装…

SSM大学生社团信息管理系统-99953,(免费领取源码)计算机毕业设计选题开题+程序定制+论文书写+答辩ppt书写 包售后 全流程

SSM大学生社团信息管理系统APP 摘 要 随着科学技术的飞速发展&#xff0c;社会的方方面面、各行各业都在努力与现代的先进技术接轨&#xff0c;通过科技手段来提高自身的优势&#xff0c;高校当然也不能排除在外。大学生社团信息管理系统APP是以实际运用为开发背景&#xff0c…

友思特分享 | Neuro-T:零代码自动深度学习训练平台

来源&#xff1a;友思特 智能感知 友思特分享 | Neuro-T&#xff1a;零代码自动深度学习训练平台 欢迎关注虹科&#xff0c;为您提供最新资讯&#xff01; 工业自动化、智能化浪潮涌进&#xff0c;视觉技术在其中扮演了至关重要的角色。在汽车、制造业、医药、芯片、食品等行业…

HDX读卡器牛羊管理RFID设备品牌

半双工HDX&#xff08;Half Duplex&#xff09;技术是ISO11784/5中规定的另一种标签与读写器之间的通讯方式&#xff0c;与全双工工&#xff08;FDX&#xff09;相比&#xff0c;HDX通常识别能力更强&#xff0c;有更大的识别距离。在HDX读写器的射频场与HDX标签响应期间关闭&a…

Linux:文件系统初步理解

文章目录 文件的初步理解C语言中对文件的接口系统调用的接口位图的理解open调用接口 文件和进程的关系进程和文件的低耦合 如何理解一切皆文件&#xff1f; 本篇总结的是关于Linux中文件的各种知识 文件的初步理解 在前面的文章中有两个观点&#xff0c;1. 文件 内容 属性&…

软著项目推荐 深度学习 opencv python 实现中国交通标志识别

文章目录 0 前言1 yolov5实现中国交通标志检测2.算法原理2.1 算法简介2.2网络架构2.3 关键代码 3 数据集处理3.1 VOC格式介绍3.2 将中国交通标志检测数据集CCTSDB数据转换成VOC数据格式3.3 手动标注数据集 4 模型训练5 实现效果5.1 视频效果 6 最后 0 前言 &#x1f525; 优质…

Presto+Alluxio数据平台实战

数新网络&#xff0c;让每个人享受数据的价值https://xie.infoq.cn/link?targethttps%3A%2F%2Fwww.datacyber.com%2F 一、Presto & Alluxio简介 Presto Presto是由Facebook开发的开源大数据分布式高性能 SQL查询引擎。 起初&#xff0c;Facebook使用Hive来进行交互式查询…

基于51单片机电子钟闹钟LCD1602显示proteus仿真设计

基于51单片机的LCD1602电子钟闹钟proteus仿真设计 基于51单片机的LCD1602电子钟闹钟proteus仿真设计功能介绍&#xff1a;仿真图&#xff1a;原理图&#xff1a;设计报告&#xff1a;程序&#xff1a;器件清单&#xff1a;资料清单&&下载链接&#xff1a; 基于51单片机…