.Net6使用RabbitMQ实现基于事件总线EventBus通信

news/2024/10/5 8:48:19/文章来源:https://www.cnblogs.com/mingcore/p/18286963

定义

用来管理所有的事件的一种机制就称作为事件总线,包括事件发布,事件存储,事件订阅,事件处理的统称。

作用

实现微服务之间的解耦和消息传递,它允许微服务之间通过发送和订阅事件来进行通信,而不需要彼此了解,不需要直接调用彼此的API或方法。
具体功能如下

  1. 解耦微服务:通过使用EventBus,微服务之间的通信可以变得松散耦合,无需相互依赖。发送事件的微服务不需要知道哪些微服务会订阅该事件,也无需关心事件的处理方式。相反,订阅事件的微服务只需要订阅感兴趣的事件,而不需要关心事件是如何产生的、是谁发送了事件。
  2. 异步通信:使用EventBus可以实现异步通信模式。当某个微服务发送事件时,不会被阻塞等待其他微服务的响应。这样可以提高系统的性能和并发处理能力。
  3. 可扩展性:通过EventBus,可以轻松地添加新的微服务或移除现有的微服务,而不会对系统的其他部分产生影响。这极大地提高了系统的可扩展性和灵活性。
  4. 事件驱动架构:EventBus可以帮助实现事件驱动架构(Event-driven Architecture),其中各个微服务通过发布和订阅事件来进行通信和协作。这种架构模式可以提供更高的灵活性、可维护性和可扩展性。

.Net6使用RabbitMQ实现基于事件总线EventBus通信

Nuget包安装信息

RabbitMQ.Client、Polly、Autofac.Extensions.DependencyInjection、Autofac.Extras.DynamicProxy

配置RabbitMQ节点信息
"RabbitMQ": {"Enabled": true,"Connection": "localhost","UserName": "guest","Password": "guest","Port": 5672,"RetryCount": 3
},
"EventBus": {"Enabled": true,"SubscriptionClientName": "Net6.Core"
}

通过连接工厂初始化RabbitMQ配置信息
public static class RabbitMQSetup
{public static void AddRabbitMQSetup(this IServiceCollection services){if (services == null) throw new ArgumentNullException(nameof(services));if (AppSettings.app(new string[] { "RabbitMQ", "Enabled" }).ObjToBool()){services.AddSingleton<IRabbitMQPersistentConnection>(sp =>{var logger = sp.GetRequiredService<ILogger<RabbitMQPersistentConnection>>();var factory = new ConnectionFactory(){HostName = AppSettings.app(new string[] { "RabbitMQ", "Connection" }),DispatchConsumersAsync = true,factory.UserName = AppSettings.app(new string[] { "RabbitMQ", "UserName" }),factory.Password = AppSettings.app(new string[] { "RabbitMQ", "Password" }),factory.Port = AppSettings.app(new string[] { "RabbitMQ", "Port" }).ObjToInt(),};int retryCount = AppSettings.app(new string[] { "RabbitMQ", "RetryCount" }).ObjToInt();return new RabbitMQPersistentConnection(factory, logger, retryCount);});}}
}

注入RabbitMQ连接配置服务
builder.Services.AddRabbitMQSetup();

新建IntegrationEvent类
/// <summary>
/// 事件模型
/// 基类
/// </summary>
public class IntegrationEvent
{public IntegrationEvent(){Id = Guid.NewGuid();CreationDate = DateTime.UtcNow;}[JsonConstructor]public IntegrationEvent(Guid id, DateTime createDate){Id = id;CreationDate = createDate;}[JsonProperty]public Guid Id { get; private set; }[JsonProperty]public DateTime CreationDate { get; private set; }
}

新建IIntegrationEventHandler接口

定义事件处理程序类

/// <summary>
/// 集成事件处理程序
/// 泛型接口
/// </summary>
/// <typeparam name="TIntegrationEvent"></typeparam>
public interface IIntegrationEventHandler<in TIntegrationEvent> : IIntegrationEventHandlerwhere TIntegrationEvent : IntegrationEvent
{Task Handle(TIntegrationEvent @event);
}/// <summary>
/// 集成事件处理程序
/// 基 接口
/// </summary>
public interface IIntegrationEventHandler
{
}

新建IDynamicIntegrationEventHandler接口
/// <summary>
/// 动态集成事件处理程序
/// 接口
/// </summary>
public interface IDynamicIntegrationEventHandler
{Task Handle(dynamic eventData);
}

新建IEventBus接口
/// <summary>
/// 事件总线
/// 接口
/// </summary>
public interface IEventBus
{/// <summary>/// 发布/// </summary>/// <param name="event">事件,参数类必须继承自IntegrationEvent类</param>void Publish(IntegrationEvent @event);//void Publish<TEvent>(TEvent @event) where TEvent : IntegrationEvent;/// <summary>/// 订阅/// </summary>/// <typeparam name="T">订阅的事件类型</typeparam>/// <typeparam name="TH">处理该事件的事件处理程序类型。它是泛型参数,必须实现 IIntegrationEventHandler<TEvent> 接口</typeparam>void Subscribe<T, TH>()where T : IntegrationEventwhere TH : IIntegrationEventHandler<T>;/// <summary>/// 取消订阅/// </summary>/// <typeparam name="T">订阅的事件类型</typeparam>/// <typeparam name="TH">处理该事件的事件处理程序类型。它是泛型参数,必须实现 IIntegrationEventHandler<TEvent> 接口</typeparam>void Unsubscribe<T, TH>()where TH : IIntegrationEventHandler<T>where T : IntegrationEvent;/// <summary>/// 动态订阅/// </summary>/// <typeparam name="TH">处理该事件的事件处理程序类型。它是泛型参数,必须实现 IIntegrationEventHandler<TEvent> 接口</typeparam>/// <param name="eventName"></param>void SubscribeDynamic<TH>(string eventName)where TH : IDynamicIntegrationEventHandler;/// <summary>/// 动态取消订阅/// </summary>/// <typeparam name="TH">处理该事件的事件处理程序类型。它是泛型参数,必须实现 IIntegrationEventHandler<TEvent> 接口</typeparam>/// <param name="eventName"></param>void UnsubscribeDynamic<TH>(string eventName)where TH : IDynamicIntegrationEventHandler;
}

新建IEventBusSubscriptionsManager接口
/// <summary>
/// 事件总线订阅管理器
/// 接口
/// </summary>
public interface IEventBusSubscriptionsManager
{bool IsEmpty { get; }event EventHandler<string> OnEventRemoved;void AddDynamicSubscription<TH>(string eventName)where TH : IDynamicIntegrationEventHandler;void AddSubscription<T, TH>()where T : IntegrationEventwhere TH : IIntegrationEventHandler<T>;void RemoveSubscription<T, TH>()where TH : IIntegrationEventHandler<T>where T : IntegrationEvent;void RemoveDynamicSubscription<TH>(string eventName)where TH : IDynamicIntegrationEventHandler;bool HasSubscriptionsForEvent<T>() where T : IntegrationEvent;bool HasSubscriptionsForEvent(string eventName);Type GetEventTypeByName(string eventName);void Clear();IEnumerable<SubscriptionInfo> GetHandlersForEvent<T>() where T : IntegrationEvent;IEnumerable<SubscriptionInfo> GetHandlersForEvent(string eventName);string GetEventKey<T>();
}

新建InMemoryEventBusSubscriptionsManager类
/// <summary>
/// 基于内存
/// 事件总线订阅管理器
/// 单例模式
/// </summary>
public partial class InMemoryEventBusSubscriptionsManager : IEventBusSubscriptionsManager
{private readonly Dictionary<string, List<SubscriptionInfo>> _handlers;private readonly List<Type> _eventTypes;public event EventHandler<string> OnEventRemoved;public InMemoryEventBusSubscriptionsManager(){_handlers = new Dictionary<string, List<SubscriptionInfo>>();_eventTypes = new List<Type>();}public bool IsEmpty => !_handlers.Keys.Any();public void Clear() => _handlers.Clear();/// <summary>/// 添加动态订阅/// </summary>/// <typeparam name="TH">约束:动态事件处理器接口</typeparam>/// <param name="eventName"></param>public void AddDynamicSubscription<TH>(string eventName)where TH : IDynamicIntegrationEventHandler{DoAddSubscription(typeof(TH), eventName, isDynamic: true);}/// <summary>/// 添加订阅/// </summary>/// <typeparam name="T">约束:事件</typeparam>/// <typeparam name="TH">约束:事件处理器接口<事件></typeparam>public void AddSubscription<T, TH>()where T : IntegrationEventwhere TH : IIntegrationEventHandler<T>{var eventName = GetEventKey<T>();DoAddSubscription(typeof(TH), eventName, isDynamic: false);if (!_eventTypes.Contains(typeof(T))){_eventTypes.Add(typeof(T));}}private void DoAddSubscription(Type handlerType, string eventName, bool isDynamic){if (!HasSubscriptionsForEvent(eventName)){_handlers.Add(eventName, new List<SubscriptionInfo>());}if (_handlers[eventName].Any(s => s.HandlerType == handlerType)){throw new ArgumentException($"Handler Type {handlerType.Name} already registered for '{eventName}'", nameof(handlerType));}if (isDynamic){_handlers[eventName].Add(SubscriptionInfo.Dynamic(handlerType));}else{_handlers[eventName].Add(SubscriptionInfo.Typed(handlerType));}}/// <summary>/// 移除动态订阅/// </summary>/// <typeparam name="TH"></typeparam>/// <param name="eventName"></param>public void RemoveDynamicSubscription<TH>(string eventName)where TH : IDynamicIntegrationEventHandler{var handlerToRemove = FindDynamicSubscriptionToRemove<TH>(eventName);DoRemoveHandler(eventName, handlerToRemove);}public void RemoveSubscription<T, TH>()where TH : IIntegrationEventHandler<T>where T : IntegrationEvent{var handlerToRemove = FindSubscriptionToRemove<T, TH>();var eventName = GetEventKey<T>();DoRemoveHandler(eventName, handlerToRemove);}private void DoRemoveHandler(string eventName, SubscriptionInfo subsToRemove){if (subsToRemove != null){_handlers[eventName].Remove(subsToRemove);if (!_handlers[eventName].Any()){_handlers.Remove(eventName);var eventType = _eventTypes.SingleOrDefault(e => e.Name == eventName);if (eventType != null){_eventTypes.Remove(eventType);}RaiseOnEventRemoved(eventName);}}}public IEnumerable<SubscriptionInfo> GetHandlersForEvent<T>() where T : IntegrationEvent{var key = GetEventKey<T>();return GetHandlersForEvent(key);}public IEnumerable<SubscriptionInfo> GetHandlersForEvent(string eventName) => _handlers[eventName];private void RaiseOnEventRemoved(string eventName){var handler = OnEventRemoved;handler?.Invoke(this, eventName);}private SubscriptionInfo FindDynamicSubscriptionToRemove<TH>(string eventName)where TH : IDynamicIntegrationEventHandler{return DoFindSubscriptionToRemove(eventName, typeof(TH));}/// <summary>/// 查询订阅并移除/// </summary>/// <typeparam name="T"></typeparam>/// <typeparam name="TH"></typeparam>/// <returns></returns>private SubscriptionInfo FindSubscriptionToRemove<T, TH>()where T : IntegrationEventwhere TH : IIntegrationEventHandler<T>{var eventName = GetEventKey<T>();return DoFindSubscriptionToRemove(eventName, typeof(TH));}private SubscriptionInfo DoFindSubscriptionToRemove(string eventName, Type handlerType){if (!HasSubscriptionsForEvent(eventName)){return null;}return _handlers[eventName].SingleOrDefault(s => s.HandlerType == handlerType);}public bool HasSubscriptionsForEvent<T>() where T : IntegrationEvent{var key = GetEventKey<T>();return HasSubscriptionsForEvent(key);}public bool HasSubscriptionsForEvent(string eventName) => _handlers.ContainsKey(eventName);public Type GetEventTypeByName(string eventName) => _eventTypes.SingleOrDefault(t => t.Name == eventName);public string GetEventKey<T>(){return typeof(T).Name;}
}

新建SubscriptionInfo订阅信息模型类
/// <summary>
/// 订阅信息模型
/// </summary>
public class SubscriptionInfo
{public bool IsDynamic { get; }public Type HandlerType { get; }private SubscriptionInfo(bool isDynamic, Type handlerType){IsDynamic = isDynamic;HandlerType = handlerType;}public static SubscriptionInfo Dynamic(Type handlerType){return new SubscriptionInfo(true, handlerType);}public static SubscriptionInfo Typed(Type handlerType){return new SubscriptionInfo(false, handlerType);}
}

新建IRabbitMQPersistentConnection持久化连接接口
/// <summary>
/// RabbitMQ持久连接
/// 接口
/// </summary>
public interface IRabbitMQPersistentConnection: IDisposable
{bool IsConnected { get; }bool TryConnect();IModel CreateModel();
}

新建RabbitMQPersistentConnection实现类
    /// <summary>/// RabbitMQ持久连接/// </summary>public class RabbitMQPersistentConnection: IRabbitMQPersistentConnection{private readonly IConnectionFactory _connectionFactory;private readonly ILogger<RabbitMQPersistentConnection> _logger;private readonly int _retryCount;IConnection _connection;bool _disposed;object sync_root = new object();public RabbitMQPersistentConnection(IConnectionFactory connectionFactory, ILogger<RabbitMQPersistentConnection> logger,int retryCount = 5){_connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory));_logger = logger ?? throw new ArgumentNullException(nameof(logger));_retryCount = retryCount;}/// <summary>/// 是否已连接/// </summary>public bool IsConnected{get{return _connection != null && _connection.IsOpen && !_disposed;}}/// <summary>/// 创建Model/// </summary>/// <returns></returns>public IModel CreateModel(){if (!IsConnected){throw new InvalidOperationException("No RabbitMQ connections are available to perform this action");}return _connection.CreateModel();}/// <summary>/// 释放/// </summary>public void Dispose(){if (_disposed) return;_disposed = true;try{_connection.Dispose();}catch (IOException ex){_logger.LogCritical(ex.ToString());}}/// <summary>/// 连接/// </summary>/// <returns></returns>public bool TryConnect(){_logger.LogInformation("RabbitMQ Client is trying to connect");lock (sync_root){var policy = RetryPolicy.Handle<SocketException>().Or<BrokerUnreachableException>().WaitAndRetry(_retryCount,retryAttempt =>TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>{_logger.LogWarning(ex, "RabbitMQ Client could not connect after {TimeOut}s ({ExceptionMessage})", $"{time.TotalSeconds:n1}", ex.Message);});policy.Execute(() =>{_connection = _connectionFactory.CreateConnection();});if (IsConnected){_connection.ConnectionShutdown += OnConnectionShutdown;_connection.CallbackException += OnCallbackException;_connection.ConnectionBlocked += OnConnectionBlocked;_logger.LogInformation("RabbitMQ Client acquired a persistent connection to '{HostName}' and is subscribed to failure events", _connection.Endpoint.HostName);return true;}else{_logger.LogCritical("FATAL ERROR: RabbitMQ connections could not be created and opened");return false;}}}/// <summary>/// 连接被阻断/// </summary>/// <param name="sender"></param>/// <param name="e"></param>private void OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e){if (_disposed) return;_logger.LogWarning("A RabbitMQ connection is shutdown. Trying to re-connect...");TryConnect();}/// <summary>/// 连接出现异常/// </summary>/// <param name="sender"></param>/// <param name="e"></param>void OnCallbackException(object sender, CallbackExceptionEventArgs e){if (_disposed) return;_logger.LogWarning("A RabbitMQ connection throw exception. Trying to re-connect...");TryConnect();}/// <summary>/// 连接被关闭/// </summary>/// <param name="sender"></param>/// <param name="reason"></param>void OnConnectionShutdown(object sender, ShutdownEventArgs reason){if (_disposed) return;_logger.LogWarning("A RabbitMQ connection is on shutdown. Trying to re-connect...");TryConnect();}}

新建EventBusRabbitMQ类

实现消息的发布、订阅、取消订阅接口。

/// <summary>
/// 基于RabbitMQ的事件总线
/// </summary>
public class EventBusRabbitMQ : IEventBus, IDisposable
{const string BROKER_NAME = "core_event_bus";private readonly IRabbitMQPersistentConnection _persistentConnection;private readonly ILogger<EventBusRabbitMQ> _logger;private readonly IEventBusSubscriptionsManager _subsManager;private readonly ILifetimeScope _autofac;private readonly string AUTOFAC_SCOPE_NAME = "core_event_bus";private readonly int _retryCount;private IModel _consumerChannel;private string _queueName;/// <summary>/// RabbitMQ事件总线/// </summary>/// <param name="persistentConnection">RabbitMQ持久连接</param>/// <param name="logger">日志</param>/// <param name="autofac">autofac容器</param>/// <param name="subsManager">事件总线订阅管理器</param>/// <param name="queueName">队列名称</param>/// <param name="retryCount">重试次数</param>public EventBusRabbitMQ(IRabbitMQPersistentConnection persistentConnection, ILogger<EventBusRabbitMQ> logger,ILifetimeScope autofac, IEventBusSubscriptionsManager subsManager, string queueName = null, int retryCount = 5){_persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection));_logger = logger ?? throw new ArgumentNullException(nameof(logger));_subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager();_queueName = queueName;_consumerChannel = CreateConsumerChannel();_autofac = autofac;_retryCount = retryCount;_subsManager.OnEventRemoved += SubsManager_OnEventRemoved;}/// <summary>/// 订阅管理器事件/// </summary>/// <param name="sender"></param>/// <param name="eventName"></param>private void SubsManager_OnEventRemoved(object sender, string eventName){if (!_persistentConnection.IsConnected){_persistentConnection.TryConnect();}using (var channel = _persistentConnection.CreateModel()){channel.QueueUnbind(queue: _queueName,exchange: BROKER_NAME,routingKey: eventName);if (_subsManager.IsEmpty){_queueName = string.Empty;_consumerChannel.Close();}}}/// <summary>/// 发布/// </summary>/// <param name="event">事件模型</param>public void Publish(IntegrationEvent @event){if (!_persistentConnection.IsConnected){_persistentConnection.TryConnect();}var policy = RetryPolicy.Handle<BrokerUnreachableException>().Or<SocketException>().WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>{_logger.LogWarning(ex, "Could not publish event: {EventId} after {Timeout}s ({ExceptionMessage})", @event.Id, $"{time.TotalSeconds:n1}", ex.Message);});var eventName = @event.GetType().Name;_logger.LogTrace("Creating RabbitMQ channel to publish event: {EventId} ({EventName})", @event.Id, eventName);using (var channel = _persistentConnection.CreateModel()){_logger.LogTrace("Declaring RabbitMQ exchange to publish event: {EventId}", @event.Id);channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct");var message = JsonConvert.SerializeObject(@event);var body = Encoding.UTF8.GetBytes(message);policy.Execute(() =>{var properties = channel.CreateBasicProperties();properties.DeliveryMode = 2; // persistent_logger.LogTrace("Publishing event to RabbitMQ: {EventId}", @event.Id);channel.BasicPublish(exchange: BROKER_NAME,routingKey: eventName,mandatory: true,basicProperties: properties,body: body);});}}/// <summary>/// 订阅/// 动态/// </summary>/// <typeparam name="TH">事件处理器</typeparam>/// <param name="eventName">事件名</param>public void SubscribeDynamic<TH>(string eventName)where TH : IDynamicIntegrationEventHandler{_logger.LogInformation("Subscribing to dynamic event {EventName} with {EventHandler}", eventName, typeof(TH).GetGenericTypeName());DoInternalSubscription(eventName);_subsManager.AddDynamicSubscription<TH>(eventName);StartBasicConsume();}/// <summary>/// 订阅/// </summary>/// <typeparam name="T">约束:事件模型</typeparam>/// <typeparam name="TH">约束:事件处理器<事件模型></typeparam>public void Subscribe<T, TH>()where T : IntegrationEventwhere TH : IIntegrationEventHandler<T>{var eventName = _subsManager.GetEventKey<T>();DoInternalSubscription(eventName);_logger.LogInformation("Subscribing to event {EventName} with {EventHandler}", eventName, typeof(TH).GetGenericTypeName());ConsoleHelper.WriteSuccessLine($"Subscribing to event {eventName} with {typeof(TH).GetGenericTypeName()}");_subsManager.AddSubscription<T, TH>();StartBasicConsume();}private void DoInternalSubscription(string eventName){var containsKey = _subsManager.HasSubscriptionsForEvent(eventName);if (!containsKey){if (!_persistentConnection.IsConnected){_persistentConnection.TryConnect();}using (var channel = _persistentConnection.CreateModel()){channel.QueueBind(queue: _queueName,exchange: BROKER_NAME,routingKey: eventName);}}}/// <summary>/// 取消订阅/// </summary>/// <typeparam name="T"></typeparam>/// <typeparam name="TH"></typeparam>public void Unsubscribe<T, TH>()where T : IntegrationEventwhere TH : IIntegrationEventHandler<T>{var eventName = _subsManager.GetEventKey<T>();_logger.LogInformation("Unsubscribing from event {EventName}", eventName);_subsManager.RemoveSubscription<T, TH>();}public void UnsubscribeDynamic<TH>(string eventName)where TH : IDynamicIntegrationEventHandler{_subsManager.RemoveDynamicSubscription<TH>(eventName);}public void Dispose(){if (_consumerChannel != null){_consumerChannel.Dispose();}_subsManager.Clear();}/// <summary>/// 开始基本消费/// </summary>private void StartBasicConsume(){_logger.LogTrace("Starting RabbitMQ basic consume");if (_consumerChannel != null){var consumer = new AsyncEventingBasicConsumer(_consumerChannel);consumer.Received += Consumer_Received;_consumerChannel.BasicConsume(queue: _queueName,autoAck: false,consumer: consumer);}else{_logger.LogError("StartBasicConsume can't call on _consumerChannel == null");}}/// <summary>/// 消费者接受到/// </summary>/// <param name="sender"></param>/// <param name="eventArgs"></param>/// <returns></returns>private async Task Consumer_Received(object sender, BasicDeliverEventArgs eventArgs){var eventName = eventArgs.RoutingKey;var message = Encoding.UTF8.GetString(eventArgs.Body.Span);try{if (message.ToLowerInvariant().Contains("throw-fake-exception")){throw new InvalidOperationException($"Fake exception requested: \"{message}\"");}await ProcessEvent(eventName, message);}catch (Exception ex){_logger.LogWarning(ex, "----- ERROR Processing message \"{Message}\"", message);}// Even on exception we take the message off the queue.// in a REAL WORLD app this should be handled with a Dead Letter Exchange (DLX). // For more information see: https://www.rabbitmq.com/dlx.html_consumerChannel.BasicAck(eventArgs.DeliveryTag, multiple: false);}/// <summary>/// 创造消费通道/// </summary>/// <returns></returns>private IModel CreateConsumerChannel(){if (!_persistentConnection.IsConnected){_persistentConnection.TryConnect();}_logger.LogTrace("Creating RabbitMQ consumer channel");var channel = _persistentConnection.CreateModel();channel.ExchangeDeclare(exchange: BROKER_NAME,type: "direct");channel.QueueDeclare(queue: _queueName,durable: true,exclusive: false,autoDelete: false,arguments: null);channel.CallbackException += (sender, ea) =>{_logger.LogWarning(ea.Exception, "Recreating RabbitMQ consumer channel");_consumerChannel.Dispose();_consumerChannel = CreateConsumerChannel();StartBasicConsume();};return channel;}private async Task ProcessEvent(string eventName, string message){_logger.LogTrace("Processing RabbitMQ event: {EventName}", eventName);if (_subsManager.HasSubscriptionsForEvent(eventName)){using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME)){var subscriptions = _subsManager.GetHandlersForEvent(eventName);foreach (var subscription in subscriptions){if (subscription.IsDynamic){var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler;if (handler == null) continue;dynamic eventData = JObject.Parse(message);await Task.Yield();await handler.Handle(eventData);}else{var handler = scope.ResolveOptional(subscription.HandlerType);if (handler == null) continue;var eventType = _subsManager.GetEventTypeByName(eventName);var integrationEvent = JsonConvert.DeserializeObject(message, eventType);var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);await Task.Yield();await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });}}}}else{_logger.LogWarning("No subscription for RabbitMQ event: {EventName}", eventName);}}
}

新建事件处理程序实现类

继承IntegrationEvent基类

public class UserQueryIntegrationEvent : IntegrationEvent
{public string UserId { get; private set; }public UserQueryIntegrationEvent(string userid)=> UserId = userid;
}
public class UserQueryIntegrationEventHandler : IIntegrationEventHandler<UserQueryIntegrationEvent>
{private readonly IUserServices _userServices;private readonly ILogger<UserQueryIntegrationEventHandler> _logger;public UserQueryIntegrationEventHandler(IUserServices userServices,ILogger<UserQueryIntegrationEventHandler> logger){_userServices = userServices;_logger = logger ?? throw new ArgumentNullException(nameof(logger));}public async Task Handle(UserQueryIntegrationEvent @event){_logger.LogInformation("----- Handling integration event: {IntegrationEventId} at {AppName} - ({@IntegrationEvent})", @event.Id, "Core", @event);ConsoleHelper.WriteSuccessLine($"----- Handling integration event: {@event.Id} at Core - ({@event})");await _userServices.QueryById(@event.UserId.ToString());}}

依赖注入EventBus服务
/// <summary>
/// EventBus 事件总线服务
/// </summary>
public static class EventBusSetup
{public static void AddEventBusSetup(this IServiceCollection services){if (services == null) throw new ArgumentNullException(nameof(services));if (AppSettings.app(new string[] { "EventBus", "Enabled" }).ObjToBool()){var subscriptionClientName = AppSettings.app(new string[] { "EventBus", "SubscriptionClientName" });services.AddSingleton<IEventBusSubscriptionsManager, InMemoryEventBusSubscriptionsManager>();services.AddTransient<UserQueryIntegrationEventHandler>();if (AppSettings.app(new string[] { "RabbitMQ", "Enabled" }).ObjToBool()){services.AddSingleton<IEventBus, EventBusRabbitMQ>(sp =>{var rabbitMQPersistentConnection = sp.GetRequiredService<IRabbitMQPersistentConnection>();var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>();var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();var retryCount = 5;if (!string.IsNullOrEmpty(AppSettings.app(new string[] { "RabbitMQ", "RetryCount" }))){retryCount = int.Parse(AppSettings.app(new string[] { "RabbitMQ", "RetryCount" }));}return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, subscriptionClientName, retryCount);});}}}
}
builder.Services.AddEventBusSetup();

创建EventBus后台任务
public class EventBusHostedService : IHostedService
{private readonly IServiceProvider _serviceProvider;private readonly ILogger<EventBusHostedService> _logger;public EventBusHostedService(IServiceProvider serviceProvider, ILogger<EventBusHostedService> logger){_serviceProvider = serviceProvider;_logger = logger;}public async Task StartAsync(CancellationToken cancellationToken){_logger.LogInformation("Start EventBus Service!");await DoWork();}private Task DoWork(){if (AppSettings.app(new string[] { "EventBus", "Enabled" }).ObjToBool()){var eventBus = _serviceProvider.GetRequiredService<IEventBus>();eventBus.Subscribe<BlogQueryIntegrationEvent, BlogQueryIntegrationEventHandler>();}return Task.CompletedTask;}public Task StopAsync(CancellationToken cancellationToken){_logger.LogInformation("Stop EventBus Service!");return Task.CompletedTask;}
}
builder.Services.AddHostedService<EventBusHostedService>()

配置host
builder.Host.UseServiceProviderFactory(new AutofacServiceProviderFactory())

测试
/// <summary>
/// 测试RabbitMQ事件总线
/// </summary>
/// <param name="_eventBus"></param>
/// <param name="userId"></param>
/// <returns></returns>
[HttpGet]
[AllowAnonymous]
public void EventBusTry([FromServices] IEventBus _eventBus, string userId = "1")
{var deletedEvent = new UserQueryIntegrationEvent(userId);_eventBus.Publish(deletedEvent);
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/739208.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

用StabilityMatrix一键安装Stable Diffusion

Stable Diffusion是2022年发布的深度学习文字到图像生成模型,它既能免费使用,又能部署在本地端,又有非常多的模型可以直接套用,在使用体验上比Midjourney和DALL-E更加强大。Stable Diffusion使用的模型有下列几大类,对照模型网站 https://civitai.com 以形成更直观的认识:…

「代码随想录算法训练营」第四天 | 链表 part2

24.两两交换链表中的节点题目链接:https://leetcode.cn/problems/swap-nodes-in-pairs/ 题目难度:中等 文章讲解:https://programmercarl.com/0024.两两交换链表中的节点.html#算法公开课 视频讲解: https://www.bilibili.com/video/BV1YT411g7br 题目状态:有思路,但细节…

【0基础学爬虫】爬虫框架之 feapder 的使用

前言 大数据时代,各行各业对数据采集的需求日益增多,网络爬虫的运用也更为广泛,越来越多的人开始学习网络爬虫这项技术,K哥爬虫此前已经推出不少爬虫进阶、逆向相关文章,为实现从易到难全方位覆盖,特设【0基础学爬虫】专栏,帮助小白快速入门爬虫。 学习爬虫的过程中,一…

PTA题目集7-8的总结

PTA题目集7-8的总结 1.前言: 2.设计与分析: 3.踩坑心得: 4.改进意见: 5.总结 1.前言:PTA题目集7新增了互斥开关,窗帘,多并联电路和多串联电路。由于之前的输入信息中设备的引脚没有作用,所以我的正则表达式只用来提取设备的名字。而互斥开关有三个引脚,不同引脚的电压…

pycharm 动态绘图

_tkinter.TclError: Cant find a usable init.tcl in the following directories:解决办法: 1 打开报错地址所在文件 D:/Program Files/METACOMP/mlib/tcltk8/lib/tcl8.4/init.tcl: version conflict for package "Tcl": have 8.6.9, need exactly 8.4 2 修改init.t…

从零开始教你写一个MLIR Pass

笔者在去年写了一篇LLVM Pass的教程,现在从事MLIR的开发近1年了,写点教程回馈下社区。 MLIR(Multi-Level Intermediate Representation,多层中间表示)是LLVM之父(博士期间开发的LLVM)的Chris Lattner带领团队开发的编译器基础设施,其增强了 LLVM IR表达能力,而且其是关注P…

idm下载

获得百度网盘直链下载链接调用idm谷歌插件获得城盘直链下载链接调用idm谷歌插件 https://ctfile.qinlili.bid/当你的才华配不上你的野心,努力的时候到了!

Arthas进阶-笔记

《Arthas进阶》 学习目标类和类加载器相关的命令 monitor/watch/trace/stack等核心命令的使用 火焰图的生成 Arthas实战案例dump 作用 将已加载类的字节码文件保存到特定目录:logs/arthas/classdump/ 参数数名称 参数说明class-pattern 类名表达式匹配[c:] 类所属 ClassLoader…

域名、备案和HTTPS

有了域名后,可以方便其他人记住并访问12.域名、备案和HTTPS 有了域名后,可以方便其他人记住并访问,历史上不乏大企业花大价钱购买域名的:京东域名换成 JD.com,并且说是为了防止百度吸引流量,为什么? 唯品会买下域名 VIP.COM 或花费千万‍ 域名提供商 如果想要域名,得去…

陪玩app源码,加密算法中密钥生成和读取一览

陪玩app源码,加密算法中密钥生成和读取一览密钥生成与读取密码学随机数密码学随机数算法在安全场景中使用广泛,如:生成对称密钥、盐、iv等,因此相比普通的随机数算法(如线性同余),它需要更高强度的不可预测性,在Java中,使用SecureRandom来生成更安全的随机数,如下:pub…

陪玩小程序源码,不容错过的加密算法整理清单

陪玩小程序源码,不容错过的加密算法整理清单在开发陪玩小程序源码时,可采用的加密算法类型包含:对称加密对称加密算法,使用Cipher类即可,以广泛使用的AES为例,如下:public byte[] encrypt(byte[] data, Key key) {try {Cipher cipher = Cipher.getInstance("AES/CB…

【QT】工程库引用

创建多工程项目创建子项目UI窗体项目创建库工程项目引用库工程添加日志输出类5.1 需要添加特殊配置,否则编译会报错5.2 正确添加配置5.3 日志正常输出5.4 如果缺少5.1步骤,则报如下错误5.5 如果添加了5.1步骤,还是报The process was ended forcefully 找到项目文件,把debug…