go消息队列RabbitMQ - 订阅模式-fanout

1、发布订阅 

订阅模式,消息被路由投递给多个队列,一个消息被多个消费者获取。

1) 可以有多个消费者
2) 每个消费者有自己的queue(队列)
3) 每个队列都要绑定到Exchange(交换机)
4) 生产者发送的消息,只能发送到交换机,交换机来决定要发给哪个队列,生产者无法决定。
5) 交换机把消息发送给绑定过的所有队列
6) 队列的消费者都能拿到消息。实现一条消息被多个消费者消费

  • 相关场景:邮件群发,群聊天,广播(广告)

2、Exchanges(交换器)

消息产生者将消息放入交换机,交换机发布订阅把消息发送到所有消息队列中,对应消息队列的消费者拿到消息进行消费

有几种交换器类型可用:directtopicheaders 和 fanout。我们将集中讨论最后一个——fanout

2.1 创建交换器

创建一个这种类型的交换器,并给它起个名字叫logs

err = ch.ExchangeDeclare("logs",   // name"fanout", // typetrue,     // durablefalse,    // auto-deletedfalse,    // internalfalse,    // no-waitnil,      // arguments
)

fanout(扇出)交换器非常简单。正如你可能从名称中猜测的那样,它只是将接收到的所有消息广播到它知道的所有队列中。 

2.2 临时队列

也是自动删除队列吗,和普通队列在使用上没有什么区别,唯一的区别是,当消费者断开连接时,队列将会被删除。自动删除队列允许的消费者没有限制,也就是说当这个队列上最后一个消费者断开连接才会执行删除。

自动删除队列只需要在声明队列时,设置属性auto-delete标识为true即可。系统声明的随机队列,缺省就是自动删除的。

q, err := ch.QueueDeclare("",    // 空字符串作为队列名称false, // 非持久队列false, // delete when unusedtrue,  // 独占队列(当前声明队列的连接关闭后即被删除)false, // no-waitnil,   // arguments
)

上述方法返回时,生成的队列实例包含RabbitMQ生成的随机队列名称。例如,它可能看起来像amq.gen-JzTY20BRgKO-HjmUJj0wLg

2.3 交换器与队列绑定

 

交换器和队列之间的关系称为绑定

err = ch.QueueBind(q.Name, // queue name"",     // routing key"logs", // exchangefalse,nil,
)

从现在开始,logs交换器将会把消息添加到我们的队列中。

2.4 发布消息到交换机

例如,发布到fanout交换器:

body := bodyFrom(os.Args)
err = ch.Publish("logs", // exchange"",     // routing keyfalse,  // mandatoryfalse,  // immediateamqp.Publishing{ContentType: "text/plain",Body:        []byte(body),})

 3 完整代码

产生日志消息的生产程序与上一教程看起来没有太大不同。最重要的变化是我们现在希望将消息发布到logs交换器,而不是空的消息交换器。发送时,我们需要提供一个routingKey,但是对于fanout型交换器,它的值可以被忽略(传空字符串)。下面是emit_log.go脚本的代码:

package mainimport ("log""os""strings""github.com/streadway/amqp"
)func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()err = ch.ExchangeDeclare("logs",   // name"fanout", // typetrue,     // durablefalse,    // auto-deletedfalse,    // internalfalse,    // no-waitnil,      // arguments)failOnError(err, "Failed to declare an exchange")body := bodyFrom(os.Args)err = ch.Publish("logs", // exchange"",     // routing keyfalse,  // mandatoryfalse,  // immediateamqp.Publishing{ContentType: "text/plain",Body:        []byte(body),})failOnError(err, "Failed to publish a message")log.Printf(" [x] Sent %s", body)
}func bodyFrom(args []string) string {var s stringif (len(args) < 2) || os.Args[1] == "" {s = "hello"} else {s = strings.Join(args[1:], " ")}return s
}

(emit_logs.go源码)

如你所见,在建立连接之后,我们声明了交换器。此步骤是必需的,因为禁止发布到不存在的交换器。

如果没有队列绑定到交换器,那么消息将丢失,但这对我们来说是ok的。如果没有消费者在接收,我们可以安全地丢弃该消息。

receive_logs.go的代码:

package mainimport ("log""github.com/streadway/amqp"
)func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()err = ch.ExchangeDeclare("logs",   // name"fanout", // typetrue,     // durablefalse,    // auto-deletedfalse,    // internalfalse,    // no-waitnil,      // arguments)failOnError(err, "Failed to declare an exchange")q, err := ch.QueueDeclare("",    // namefalse, // durablefalse, // delete when unusedtrue,  // exclusivefalse, // no-waitnil,   // arguments)failOnError(err, "Failed to declare a queue")err = ch.QueueBind(q.Name, // queue name"",     // routing key"logs", // exchangefalse,nil,)failOnError(err, "Failed to bind a queue")msgs, err := ch.Consume(q.Name, // queue"",     // consumertrue,   // auto-ackfalse,  // exclusivefalse,  // no-localfalse,  // no-waitnil,    // args)failOnError(err, "Failed to register a consumer")forever := make(chan bool)go func() {for d := range msgs {log.Printf(" [x] %s", d.Body)}}()log.Printf(" [*] Waiting for logs. To exit press CTRL+C")<-forever
}

(receive_logs.go源码)

如果要将日志保存到文件,只需打开控制台并输入:

go run receive_logs.go > logs_from_rabbit.log

如果希望在屏幕上查看日志,请切换到一个新的终端并运行:

go run receive_logs.go

当然,要发出日志,请输入:

go run emit_log.go

使用rabbitmqctl list_bindings命令,你可以验证代码是否确实根据需要创建了绑定关系和队列。在运行两个receive_logs.go程序后,你应该看到类似以下内容:

sudo rabbitmqctl list_bindings
# => Listing bindings ...
# => logs    exchange        amq.gen-JzTY20BRgKO-HjmUJj0wLg  queue           []
# => logs    exchange        amq.gen-vso0PVvyiRIL2WoV3i48Yg  queue           []
# => ...done.

对结果的解释很简单:数据从logs交换器进入了两个由服务器分配名称的队列。这正是我们想要的。

 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/458254.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

龙测科技荣获2023年度技术生态构建奖

本月&#xff0c;由极客传媒举办的“有被Q到”2024 InfoQ 极客传媒合作伙伴年会顺利举办&#xff0c;龙测科技喜获2023年度技术生态构建奖。 InfoQ是首批将Node.js、HTML5、Docker等技术全面引入中国的技术媒体之一&#xff0c;秉承“扎根社区、服务社区、引领社区”的理念&…

2641. 二叉树的堂兄弟节点 II - 力扣(LeetCode)

题目描述 给你一棵二叉树的根 root &#xff0c;请你将每个节点的值替换成该节点的所有 堂兄弟节点值的和 。 如果两个节点在树中有相同的深度且它们的父节点不同&#xff0c;那么它们互为 堂兄弟 。 请你返回修改值之后&#xff0c;树的根 root 。 注意&#xff0c;一个节…

【RL】Bellman Equation (贝尔曼等式)

Lecture2: Bellman Equation State value 考虑grid-world的单步过程&#xff1a; S t → A t R t 1 , S t 1 S_t \xrightarrow[]{A_t} R_{t 1}, S_{t 1} St​At​ ​Rt1​,St1​ t t t, t 1 t 1 t1&#xff1a;时间戳 S t S_t St​&#xff1a;时间 t t t时所处的sta…

机器人工具箱学习(一)

一、机器人工具箱介绍 机器人工具箱是由来自昆士兰科技大学的教授Peter Corke开发的&#xff0c;被广泛用于机器人进行仿真&#xff08;主要是串联机器人&#xff09;。该工具箱支持机器人一些基本算法的功能&#xff0c;例如三维坐标中的方向表示&#xff0c;运动学、动力学模…

CentOS7搭建Hadoop集群

准备工作 1、准备三台虚拟机&#xff0c;参考&#xff1a;CentOS7集群环境搭建&#xff08;3台&#xff09;-CSDN博客 2、配置虚拟机之间免密登录&#xff0c;参考&#xff1a;CentOS7集群配置免密登录-CSDN博客 3、虚拟机分别安装jdk&#xff0c;参考&#xff1a;CentOS7集…

Android14音频进阶:MediaPlayerService如何启动AudioTrack 下篇(五十六)

简介: CSDN博客专家,专注Android/Linux系统,分享多mic语音方案、音视频、编解码等技术,与大家一起成长! 优质专栏:Audio工程师进阶系列【原创干货持续更新中……】🚀 优质专栏:多媒体系统工程师系列【原创干货持续更新中……】🚀 人生格言: 人生从来没有捷径,只…

机器学习-线性回归法

线性回归算法 解决回归问题思想简单&#xff0c;实现容易许多强大的非线性模型的基础结果具有很好的可解释性蕴含机器学习中的很多重要思想 样本特征只有一个&#xff0c;称为&#xff1a;简单线性回归 通过分析问题&#xff0c;确定问题的损失函数或者效用函数 通过最优化…

Halcon机器视觉实战----提取水平方向缝隙区域

前言 如何从一块区域内找到水平方向的缝隙区域&#xff08;不是高斯线条&#xff0c;从图像中提取&#xff0c;而是从区域内提取&#xff0c;考虑到了区域所在的方向&#xff09;&#xff1b; dev_close_window () dev_open_window (0, 0, 800, 800, black, WindowHandle) re…

从零开始手写mmo游戏从框架到爆炸(六)— 消息处理工厂

就好像门牌号一样&#xff0c;我们需要把消息路由到对应的楼栋和楼层&#xff0c;总不能像菜鸟一样让大家都来自己找数据吧。 首先这里我们参考了rabbitmq中的topic与tag模型&#xff0c;topic对应类&#xff0c;tag对应方法。 新增一个模块&#xff0c;专门记录路由eternity-…

[Tomcat问题]--使用Tomcat 10.x部署项目时,出现实例化Servlet类[xxx]异常

[Tomcat问题]–使用Tomcat 10.x部署项目时&#xff0c;出现实例化Servlet类[xxx]异常 本片博文在知乎同步更新 环境 OS: Windows 11 23H2Java Version: java 21.0.1 2023-10-17 LTSIDE: IntelliJ IDEA 2023.3.3Maven: Apache Maven 3.9.6Tomcat: Tomcat 10.1.18 ReleasedSer…

Netty源码 之 ByteBuf自适应扩缩容源码

Netty体系如何使得ByteBuf根据实际IO收发数据场景进行自适应扩容缩容的&#xff1f; IO收发数据的过程&#xff1a; read 读取&#xff08;"I"&#xff09;&#xff1a;网卡硬件通过网络传输介质读取对端传输过来的数据&#xff0c;网卡硬件再把数据写到recv-socke…

论文阅读——MP-Former

MP-Former: Mask-Piloted Transformer for Image Segmentation https://arxiv.org/abs/2303.07336 mask2former问题是&#xff1a;相邻层得到的掩码不连续&#xff0c;差别很大 denoising training非常有效地稳定训练时期之间的二分匹配。去噪训练的关键思想是将带噪声的GT坐标…