Net中RabbitMq.Client7.0通过依赖注入DI来管理RabbitMQ客户端的生命周期

news/2024/12/4 16:09:19/文章来源:https://www.cnblogs.com/netcore5/p/18586553

在 RabbitMQ.Client 7.0.0 版本中,
IModel 在 RabbitMQ.Client 7.0.0-alpha2 版本中已经被重命名,现在应该使用 IChannel 替代 IModel,
IChannel 不再提供 CreateBasicProperties 方法。需要直接使用 BasicProperties 类来创建消息属性。

前言

关于RabbitMq的更多知识点在: https://www.dotnetshare.com

公众号:Net分享,欢迎关注

下面是通过依赖注入(DI)来管理RabbitMQ客户端的生命周期

1. 安装RabbitMQ客户端库

首先,你需要安装RabbitMQ的.NET客户端库。这可以通过NuGet包管理器来完成:

Install-Package RabbitMQ.Client

2. 配置RabbitMQ连接字符串

在你的appsettings.json文件中,添加RabbitMQ的连接配置:

{"RabbitMQ": {"HostName": "localhost","Port": 5672,"UserName": "guest","Password": "guest"}
}

3. 创建RabbitMQ服务配置类

创建一个配置类来封装RabbitMQ的连接信息:

public class RabbitMQOptions
{public string HostName { get; set; }public int Port { get; set; }public string UserName { get; set; }public string Password { get; set; }
}

4. 注册RabbitMQ服务

Startup.cs或程序启动时的配置方法中,注册RabbitMQ服务:

// 绑定RabbitMQ配置
builder.Services.Configure<RabbitMQOptions>(builder.Configuration.GetSection("RabbitMQ"));// 注册RabbitMQ连接工厂
builder.Services.AddSingleton<IRabbitMQConnection, RabbitMQConnection>(sp =>
{var options = sp.GetRequiredService<IOptions<RabbitMQOptions>>().Value;var factory = new ConnectionFactory() { HostName = options.HostName, Port = options.Port, UserName = options.UserName, Password = options.Password };return new RabbitMQConnection(factory);
}); // 添加RabbitMQService的服务注册
builder.Services.AddSingleton<RabbitMQService>();

5. 创建RabbitMQ连接和通道工厂

创建一个工厂类来管理RabbitMQ的连接和通道:

    public interface IRabbitMQConnection : IDisposable{Task<IChannel> CreateChannel();}public class RabbitMQConnection : IRabbitMQConnection{private readonly ConnectionFactory _factory;private readonly IConnection _connection;private bool _isDisposed;public RabbitMQConnection(ConnectionFactory factory){_factory = factory ?? throw new ArgumentNullException(nameof(factory));_connection = factory.CreateConnectionAsync().Result;}public async Task<IChannel> CreateChannel(){EnsureNotDisposed();return await _connection.CreateChannelAsync();}public void Dispose(){Dispose(true);GC.SuppressFinalize(this);}protected virtual void Dispose(bool disposing){if (_isDisposed) return;if (disposing){// Free any other managed objects here.}// Free any unmanaged objects here._connection.Dispose();_isDisposed = true;}~RabbitMQConnection(){Dispose(false);}private void EnsureNotDisposed(){if (_isDisposed){throw new ObjectDisposedException(nameof(RabbitMQConnection));}}}

6. 使用RabbitMQ服务

在你的服务或消费者中,注入IRabbitMQConnection并使用它来创建模型(channel):

using RabbitMQ.Client.Events;
using RabbitMQ.Client;
using System.Text.Json;
using System.Text;public class RabbitMQService
{private readonly IRabbitMQConnection _connection;public RabbitMQService(IRabbitMQConnection connection){_connection = connection ?? throw new ArgumentNullException(nameof(connection));}public async Task SendAsync(string exchange, string routingKey, object message, bool mandatory = false, CancellationToken cancellationToken = default){try{using var channel = _connection.CreateChannel();var mesjson = JsonSerializer.Serialize(message);Console.WriteLine("发送消息:" + mesjson);var body = Encoding.UTF8.GetBytes(mesjson);var properties = new RabbitMQ.Client.BasicProperties{Persistent = true // 设置消息持久化};channel.BasicPublishAsync(exchange, routingKey, false, properties, body, cancellationToken);}catch (OperationCanceledException ex){Console.WriteLine($"Operation was canceled: {ex.Message}");//throw; // Re-throw if you want to propagate the cancellation}catch (Exception ex){Console.WriteLine($"An error occurred: {ex.Message}");//throw; // Re-throw if you want to propagate the error}  }public async Task ReceiveAsync(string queueName, Func<IChannel, byte[], Task> callback, CancellationToken cancellationToken = default){var channel = _connection.CreateChannel();await channel.QueueDeclareAsync(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);var consumer = new AsyncEventingBasicConsumer(channel);consumer.ReceivedAsync += async (model, ea) =>{var body = ea.Body.ToArray();try{// 直接传递 model 和 body 给 callback,不需要转换await callback(channel, body);}finally{//await channel.BasicAckAsync(ea.DeliveryTag, false, cancellationToken);}};await channel.BasicConsumeAsync(queue: queueName, autoAck: false, consumer: consumer, cancellationToken: cancellationToken);// Prevent the method from returning immediatelyawait Task.Delay(-1, cancellationToken);}
}

7.生产端和消费端的使用

消费
var app = builder.Build();var rabbitMQService = app.Services.GetRequiredService<RabbitMQService>();
var cancellationTokenSource = new CancellationTokenSource();
var cancellationToken = cancellationTokenSource.Token;// 启动消息接收
var receiveTask = rabbitMQService.ReceiveAsync("Test", async (channel, body) =>
{// 处理接收到的消息//string message = Encoding.UTF8.GetString(body);//Console.WriteLine($"收到消息 message: {message}");//// 确认消息//await channel.BasicAckAsync(deliveryTag: default, multiple: false, cancellationToken);}, cancellationToken);
生产端

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/846708.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

论文泛读《PICCOLO : Exposing Complex Backdoors in NLP Transformer Models》

发表时间:2022 期刊会议:IEEE Symposium on Security and Privacy (SP) 论文单位:Purdue University 论文作者:Yingqi Liu, Guangyu Shen, Guanhong Tao, Shengwei An, Shiqing Ma, Xiangyu Zhang 方向分类:Backdoor Attack 论文链接 开源代码摘要 后门可以被注入到NLP模型…

数据库管理与开发基础教程复习(二)

Oraclec 常用工具 简单介绍: SQL*PLUS 由开始菜单启动和命令行启动 企业管理器 OEM Oracle 数据库集成管理平台 SOL PLUS常用命令 连接与断开数据库连接命令: -connect -disconnect 查看表结构命令: describe (desc) SOL*PLUS 环境参数设置命令 -set -arraysize -linesize -…

【Git基础】Gitee/GitHub使用

gitee介绍 gitee译为码云,是国内创建的一个类似与github的网站,可以上传自己的代码放在云端保存,下面介绍gitee如何使用,github类似使用操作。 git工具安装及基础 【git工具安装及基本使用】 gitee仓库创建.gitignore文件作用 git提交时需要忽略部分文件或目录本地仓库设置…

威联通-002 Docker镜像下载

@目录前言操作大纲1.登录阿里云镜像服务2.创建个人容器3.GitHub复制代码到本地库、配置信息4.点击修改自己想要拉取的镜像5.进入阿里云查看6.创建容器参考(可用)前言 由于国内几乎所有的docker库的封锁,现在不能进行docker镜像的拉取操作,尝试很多种方法都失败了,最后总结…

Mysql 一主一从配置

Mysql 一主一从配置 环境信息ip地址 主机信息 角色 mysql版本192.168.1.19 S600 主 8.0.40-0ubuntu0.20.04.1192.168.1.20 H840 从 8.0.40-0ubuntu0.20.04.1本环境已完成2台Mysql单机安装,Mysql单机安装操作文档 具体操作 Mysql主机配置 配置文件修改修改配置Master配置/etc/m…

Mycat2+Mysql一主一从实现读写分离配置

Mycat2+Mysql一主一从实现读写分离配置 前置配置Mysql一主一从搭建 Mycat2环境搭建环境信息ip地址 软件 角色 版本192.168.1.19 Mysql 主 8.0.40-0ubuntu0.20.04.1192.168.1.19 Mycat2 —— 1.21-release-3-14192.168.1.20 Mysql 从 8.0.40-0ubuntu0.20.04.1操作步骤 1. 修改并…

体验iOS手机群控免费苹果手机免越狱群控:银河中控全面解析

在多设备管理的需求日益增长的今天,能够高效地管理和控制多台iOS设备成为了一个重要的课题。对于不想或不能进行越狱操作的用户来说,找到一种安全、合法且高效的解决方案显得尤为重要。本章将深入探讨一款名为“银河中控”的免费苹果手机免越狱群控系统,帮助您了解其功能特性…

k8s~关于非常啰嗦的标签和选择器

总感觉k8s中定义的deplyment和service非常的啰嗦,尤其是在选择器的定义上,但没办法,它的设计总有它的道理。svc(spec.selector.app)deployment(metadata.labels.app,spec.selector.matchLabels.app)pods(metadata.labels.app)nginx的部署 下面是一个 Kubernetes YAML 文件示…

Windows11中安装SQL Server 2019

介绍 Microsoft SQL Server 是一种关系数据库管理系统 (RDBMS)。 应用程序和工具连接到 SQL Server 实例或数据库,并使用 Transact-SQL (T-SQL) 进行通信。 SQL Server Management Studio (SSMS) 是一种集成环境,用于管理任何 SQL 基础结构。 使用 SSMS 访问、配置、管理和开…

python项目安装虚拟环境

滴水成冰,世间不存在毫无意义的付出,时间终会给你答案。

边坡检测解决方案,根据实际需求来定制方案

大家好,我是星创易联的林工。今天跟大家聊一聊我们做边坡监测的那些事儿。 ​ (参考:key-iot.com.cn ) 说到边坡监测啊,最重要的就是安全。我们公司这些年一直在这个领域深耕,积累了不少经验。来,我给大家详细说说我们是怎么做的。 首先啊,我们要先摸清楚这个边坡的脾气。用我…

【科普系列】ICMPv6协议基础简介

引言在科普介绍文章《IPv6协议—互联网通信协议第六版》中介绍了IPv6协议,这次的科普主题是ICMPv6(Internet Control Message Protocol version 6),它作为IPv6网络中的核心协议之一,是网络通信中不可或缺的一部分。ICMPv6的设计继承了IPv4中ICMPv4协议的基本功能,然而,它…