(四)「消息队列」之 RabbitMQ 路由(使用 .NET 客户端)

0、引言

先决条件

本教程假设 RabbitMQ 已安装并且正在 本地主机 的标准端口(5672)上运行。如果您使用了不同的主机、端口或凭证,则要求调整连接设置。

获取帮助

如果您在阅读本教程时遇到问题,可以通过邮件列表或者 RabbitMQ 社区 Slack 与 RabbitMQ 官方取得联系。

在上一篇教程中我们构建了一个简单的日志系统,得以向许多接收者(receiver)广播日志消息。

在本教程中我们将会为该系统添加一个特性 —— 我们将使“仅订阅消息的一个子集”成为可能。例如,我们将能够只将关键错误消息定向到日志文件(以节省磁盘空间),与此同时仍然能够在控制台上打印所有的日志消息。

原文链接:https://www.rabbitmq.com/tutorials/tutorial-four-dotnet.html

1、绑定

在上一篇例程中我们已经建立过绑定了。您也许能够回想起,代码类似于:

channel.QueueBind(queue: queueName,exchange: "logs",routingKey: string.Empty);

绑定 即表征 交换机队列 之间的关系。这可以简单地理解为:队列对来自此交换机的消息感兴趣。

绑定可以接收一个额外的 routingKey (路由键)参数。为了避免与 BasicPublish 方法的一个参数混淆,我们现在将其称之为 binding key (绑定键)。下面演示了我们如何创建一个带有键的绑定:

channel.QueueBind(queue: queueName,exchange:"direct_logs",routingKey: "black");

binding key 的含义取决于交换机的类型。比如我们之前使用的 fanout 交换机会简单地忽略它的值。

2、直连交换机(Direct exchange)

在上一篇教程中,我们的日志系统向所有的消费者广播所有消息。我们希望对其进行拓展,以允许根据消息的严重程度过滤消息。比如,我们也许想要向磁盘写入日志消息的脚本文件只接收关键错误,而不是在警告或者信息日志消息上浪费磁盘空间。

但我们在上一篇教程使用的 fanout 扇出交换机并没有给我们如此大的灵活性 —— 它只能够进行无意识的广播。

相反,我们将会使用一个 direct 直连交换机。其背后的路由算法很简单 —— 将消息路由到其 routing key 与队列的 binding key 完全匹配的队列。

为了说明这一点,请考虑如下配置:

direct-exchange

在这个配置中,我们可以看到绑定了两个队列的 direct 直连交换机 X。第一个队列使用 orange 绑定键绑定;而第二个队列拥有两个绑定,一个绑定键为 black,另一个绑定键为 green

在这样的配置下,被发布到交换机中带有 orange 路由键的消息将会被路由到 Q1 队列。而带有 black 或者 green 路由键的消息将会去往 Q2 队列。其他所有消息将会被丢弃。

3、多个绑定

direct-exchange-multiple

使用相同的绑定键绑定多个队列是完全合法的。在我们的示例中,我们可以使用 black 绑定键在 XQ1 之间添加绑定。在那样的情况下,direct 直连交换机表现得像是 fanout 扇出交换机:向所有匹配的队列广播消息。带有 black 路由键的消息将同时传递给 Q1Q2 双方。

4、发送日志

我们将在日志系统中使用这个模型,我们将消息发送至 direct 直连交换机而不是 fanout 扇出交换机。我们提供日志严重程序作为 routing key 路由键。这样接收脚本就能够选择其想要接收的严重程度。让我们首先关注发送日志:

如常,首先我们需要创建一个交换机:

channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct);

我们已经准备好发送一条消息:

var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "direct_logs",routingKey: severity,basicProperties: null,body: body);

为了简化问题我们假定 ‘严重程度’ 可以是 infowarningerror 中的一个。

5、订阅

接收消息的方式工作与上一篇教程类似,但有一点例外 —— 我们将会为我们感兴趣的每个 ‘严重程度’ 创建新的绑定。

var queueName = channel.QueueDeclare().QueueName;foreach(var severity in args)
{channel.QueueBind(queue: queueName,exchange: "direct_logs",routingKey: severity);
}

6、将所有的东西放到一起

tutorial-four

EmitLogDirect.cs 类的代码:

using System.Text;
using RabbitMQ.Client;var factory = new ConnectionFactory { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct);var severity = (args.Length > 0) ? args[0] : "info";
var message = (args.Length > 1)? string.Join(" ", args.Skip(1).ToArray()): "Hello World!";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "direct_logs",routingKey: severity,basicProperties: null,body: body);
Console.WriteLine($" [x] Sent '{severity}':'{message}'");Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();

ReceiveLogsDirect.cs 的代码:

using System.Text;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;var factory = new ConnectionFactory { HostName = "localhost" };using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct);
// declare a server-named queue
var queueName = channel.QueueDeclare().QueueName;if (args.Length < 1)
{Console.Error.WriteLine("Usage: {0} [info] [warning] [error]",Environment.GetCommandLineArgs()[0]);Console.WriteLine(" Press [enter] to exit.");Console.ReadLine();Environment.ExitCode = 1;return;
}foreach (var severity in args)
{channel.QueueBind(queue: queueName,exchange: "direct_logs",routingKey: severity);
}Console.WriteLine(" [*] Waiting for messages.");var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{var body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);var routingKey = ea.RoutingKey;Console.WriteLine($" [x] Received '{routingKey}':'{message}'");
};
channel.BasicConsume(queue: queueName,autoAck: true,consumer: consumer);Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();

像往常一样创建项目(建议参考教程一)

如果您只想要保存 warningerror(没有 info)日志消息到文件中,只需要打开一个控制台并输入:

cd ReceiveLogsDirect
dotnet run warning error > logs_from_rabbit.log

如果您想要在屏幕上看见所有的日志消息,打开一个新的终端并尝试:

cd ReceiveLogsDirect
dotnet run info warning error
# => [*] Waiting for logs. To exit press CTRL+C

例如,要发送一个 error 日志消息,只需要输入:

cd EmitLogDirect
dotnet run error "Run. Run. Or it will explode."
# => [x] Sent 'error':'Run. Run. Or it will explode.'

(EmitLogDirect.cs 和 ReceiveLogsDirect.cs 的完整源码)

运行效果:
在这里插入图片描述

要了解如何基于特定模式侦听消息,请移步至教程五。

5、生产[非]适用性免责声明

请记住,本教程和其他教程都是教程。他们一次展示一个新概念,可能会有意地过度简化一些东西,而忽略其他东西。例如,为了简洁起见,连接管理、错误处理、连接恢复、并发性和指标收集等主题在很大程度上被省略了。这种简化的代码不应该被认为可以用于生产。

在发布您的应用之前,请先查看其他文档。我们特别推荐以下指南:发布者确认和消费者确认,生产清单和监控。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/29422.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

ylb-项目简介

1、各模块服务功能 注&#xff1a;其部分实体类、接口、mapper文件由MyBatis逆向工程生成。 2、Maven管理&#xff08;多模块&#xff0c;继承和聚合&#xff09; 2.1 parent模块 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"…

CSS——基础知识及使用

CSS 是什么 CSS是层叠样式表 (Cascading Style Sheets)的简写.CSS 能够对网页中元素位置的排版进行像素级精确控制, 实现美化页面的效果. 能够做到页面的样式和结构分离。 基本语法规范 选择器 { 一条/N条声明 } 选择器决定针对谁修改 (找谁)声明决定修改啥. (干啥)声明的…

OpenCv之Canny

目录 一、自适应阈值 二、边缘检测Canny 一、自适应阈值 引入前提:在前面的部分我们使用是全局闻值&#xff0c;整幅图像采用同一个数作为闻值。当时这种方法并不适应与所有情况&#xff0c;尤其是当同一幅图像上的不同部分的具有不同亮度时。这种情况下我们需要采用自适应闻…

记录--再也不用手动改package.json的版本号

这里给大家分享我在网上总结出来的一些知识&#xff0c;希望对大家有所帮助 本文的起因是有在代码仓库发包后&#xff0c;同事问我“为什么package.json 里的版本还是原来的&#xff0c;有没有更新&#xff1f;”&#xff0c;这个时候我意识到&#xff0c;我们完全没有必要在每…

“掌握更多的快速排序技巧:三路划分、双路快排和非递归的深入理解”

快速排序是一种基于分治思想的排序算法&#xff0c;它能够以极快的速度将一个乱序的数组重新排列成有序的序列。不仅如此&#xff0c;快速排序还具有简洁的实现代码和良好的可扩展性&#xff0c;成为最受欢迎的排序算法之一。接下来&#xff0c;让我带你了解一下它的魅力吧&…

接入端口与中继端口

交换机端口是支持 IT 的基本组件&#xff0c;可实现网络通信。这些有线硬件设备负责连接并允许在不同设备和连接到其端口的网络部分之间进行数据传输。由于网络管理员在确保网络连接和可用性方面发挥着关键作用&#xff0c;因此网络管理员必须清楚地了解、映射和查看其网络交换…

【有功功率、无功功率】可再生能源配电馈线的鲁棒经济调度研究[IEEE13节点](Matlab代码实现)

&#x1f4a5;1 概述 "有功功率和无功功率" 是与电力系统中能量传输和功率控制相关的两个重要概念。 有功功率&#xff08;Active Power&#xff09;是指电力系统中传输和消耗能量的功率&#xff0c;也被称为实功功率。它负责提供电力系统中的实际电能需求&#xf…

【EasyExcel】在SpringBoot+VUE项目中引入EasyExcel实现对数据的导出(封装工具类)

在SpringBootVUE项目中引入EasyExcel实现导入导出 一、引入EasyExcel 通过maven引入&#xff0c;坐标如下&#xff1a; <dependency><groupId>com.alibaba</groupId><artifactId>easyexcel-core</artifactId><version>3.3.2</version…

【Win10系统下载Python3】

Python3官网&#xff1a;https://www.python.org/downloads/windows/ 注

微服务Day3——Nacos配置管理\Feign远程调用\Gateway网关

一、Nacos配置管理 1、统一配置管理 当微服务部署的实例越来越多&#xff0c;达到数十、数百时&#xff0c;逐个修改微服务配置就会让人抓狂&#xff0c;而且很容易出错。我们需要一种统一配置管理方案&#xff0c;可以集中管理所有实例的配置。 Nacos一方面可以将配置集中管理…

性能测试之性能问题分析

开始性能测试前需要了解的内容&#xff1a; 1、项目具体需求。 2、指标&#xff1a;响应时间在多少以内&#xff0c;并发数多少&#xff0c;tps多少&#xff0c;总tps多少&#xff0c;稳定性交易总量多少&#xff0c;事务成功率&#xff0c;交易波动范围&#xff0c;稳定运行时…

ROS:pluginlib

目录 一、前言二、概念三、作用四实际用例4.1需求4.2流程4.3准备4.4创建基类4.5创建插件4.6注册插件4.7构建插件库4.8使插件可用于ROS工具链4.8.1配置xml4.8.2导出插件 4.9使用插件4.10执行 一、前言 pluginlib直译是插件库&#xff0c;所谓插件字面意思就是可插拔的组件&…