在 RabbitMQ.Client 7.0.0 版本中,
IModel 在 RabbitMQ.Client 7.0.0-alpha2 版本中已经被重命名,现在应该使用 IChannel 替代 IModel,
IChannel 不再提供 CreateBasicProperties 方法。需要直接使用 BasicProperties 类来创建消息属性。
前言
关于RabbitMq的更多知识点在: https://www.dotnetshare.com
公众号:Net分享,欢迎关注
下面是通过依赖注入(DI)来管理RabbitMQ客户端的生命周期
1. 安装RabbitMQ客户端库
首先,你需要安装RabbitMQ的.NET客户端库。这可以通过NuGet包管理器来完成:
Install-Package RabbitMQ.Client
2. 配置RabbitMQ连接字符串
在你的appsettings.json
文件中,添加RabbitMQ的连接配置:
{"RabbitMQ": {"HostName": "localhost","Port": 5672,"UserName": "guest","Password": "guest"}
}
3. 创建RabbitMQ服务配置类
创建一个配置类来封装RabbitMQ的连接信息:
public class RabbitMQOptions
{public string HostName { get; set; }public int Port { get; set; }public string UserName { get; set; }public string Password { get; set; }
}
4. 注册RabbitMQ服务
在Startup.cs
或程序启动时的配置方法中,注册RabbitMQ服务:
// 绑定RabbitMQ配置
builder.Services.Configure<RabbitMQOptions>(builder.Configuration.GetSection("RabbitMQ"));// 注册RabbitMQ连接工厂
builder.Services.AddSingleton<IRabbitMQConnection, RabbitMQConnection>(sp =>
{var options = sp.GetRequiredService<IOptions<RabbitMQOptions>>().Value;var factory = new ConnectionFactory() { HostName = options.HostName, Port = options.Port, UserName = options.UserName, Password = options.Password };return new RabbitMQConnection(factory);
}); // 添加RabbitMQService的服务注册
builder.Services.AddSingleton<RabbitMQService>();
5. 创建RabbitMQ连接和通道工厂
创建一个工厂类来管理RabbitMQ的连接和通道:
public interface IRabbitMQConnection : IDisposable{Task<IChannel> CreateChannel();}public class RabbitMQConnection : IRabbitMQConnection{private readonly ConnectionFactory _factory;private readonly IConnection _connection;private bool _isDisposed;public RabbitMQConnection(ConnectionFactory factory){_factory = factory ?? throw new ArgumentNullException(nameof(factory));_connection = factory.CreateConnectionAsync().Result;}public async Task<IChannel> CreateChannel(){EnsureNotDisposed();return await _connection.CreateChannelAsync();}public void Dispose(){Dispose(true);GC.SuppressFinalize(this);}protected virtual void Dispose(bool disposing){if (_isDisposed) return;if (disposing){// Free any other managed objects here.}// Free any unmanaged objects here._connection.Dispose();_isDisposed = true;}~RabbitMQConnection(){Dispose(false);}private void EnsureNotDisposed(){if (_isDisposed){throw new ObjectDisposedException(nameof(RabbitMQConnection));}}}
6. 使用RabbitMQ服务
在你的服务或消费者中,注入IRabbitMQConnection
并使用它来创建模型(channel):
using RabbitMQ.Client.Events;
using RabbitMQ.Client;
using System.Text.Json;
using System.Text;public class RabbitMQService
{private readonly IRabbitMQConnection _connection;public RabbitMQService(IRabbitMQConnection connection){_connection = connection ?? throw new ArgumentNullException(nameof(connection));}public async Task SendAsync(string exchange, string routingKey, object message, bool mandatory = false, CancellationToken cancellationToken = default){try{using var channel = _connection.CreateChannel();var mesjson = JsonSerializer.Serialize(message);Console.WriteLine("发送消息:" + mesjson);var body = Encoding.UTF8.GetBytes(mesjson);var properties = new RabbitMQ.Client.BasicProperties{Persistent = true // 设置消息持久化};channel.BasicPublishAsync(exchange, routingKey, false, properties, body, cancellationToken);}catch (OperationCanceledException ex){Console.WriteLine($"Operation was canceled: {ex.Message}");//throw; // Re-throw if you want to propagate the cancellation}catch (Exception ex){Console.WriteLine($"An error occurred: {ex.Message}");//throw; // Re-throw if you want to propagate the error} }public async Task ReceiveAsync(string queueName, Func<IChannel, byte[], Task> callback, CancellationToken cancellationToken = default){var channel = _connection.CreateChannel();await channel.QueueDeclareAsync(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null);var consumer = new AsyncEventingBasicConsumer(channel);consumer.ReceivedAsync += async (model, ea) =>{var body = ea.Body.ToArray();try{// 直接传递 model 和 body 给 callback,不需要转换await callback(channel, body);}finally{//await channel.BasicAckAsync(ea.DeliveryTag, false, cancellationToken);}};await channel.BasicConsumeAsync(queue: queueName, autoAck: false, consumer: consumer, cancellationToken: cancellationToken);// Prevent the method from returning immediatelyawait Task.Delay(-1, cancellationToken);}
}
7.生产端和消费端的使用
消费
var app = builder.Build();var rabbitMQService = app.Services.GetRequiredService<RabbitMQService>();
var cancellationTokenSource = new CancellationTokenSource();
var cancellationToken = cancellationTokenSource.Token;// 启动消息接收
var receiveTask = rabbitMQService.ReceiveAsync("Test", async (channel, body) =>
{// 处理接收到的消息//string message = Encoding.UTF8.GetString(body);//Console.WriteLine($"收到消息 message: {message}");//// 确认消息//await channel.BasicAckAsync(deliveryTag: default, multiple: false, cancellationToken);}, cancellationToken);