定义
用来管理所有的事件的一种机制就称作为事件总线,包括事件发布,事件存储,事件订阅,事件处理的统称。
作用
实现微服务之间的解耦和消息传递,它允许微服务之间通过发送和订阅事件来进行通信,而不需要彼此了解,不需要直接调用彼此的API或方法。
具体功能如下
- 解耦微服务:通过使用EventBus,微服务之间的通信可以变得松散耦合,无需相互依赖。发送事件的微服务不需要知道哪些微服务会订阅该事件,也无需关心事件的处理方式。相反,订阅事件的微服务只需要订阅感兴趣的事件,而不需要关心事件是如何产生的、是谁发送了事件。
- 异步通信:使用EventBus可以实现异步通信模式。当某个微服务发送事件时,不会被阻塞等待其他微服务的响应。这样可以提高系统的性能和并发处理能力。
- 可扩展性:通过EventBus,可以轻松地添加新的微服务或移除现有的微服务,而不会对系统的其他部分产生影响。这极大地提高了系统的可扩展性和灵活性。
- 事件驱动架构:EventBus可以帮助实现事件驱动架构(Event-driven Architecture),其中各个微服务通过发布和订阅事件来进行通信和协作。这种架构模式可以提供更高的灵活性、可维护性和可扩展性。
.Net6使用RabbitMQ实现基于事件总线EventBus通信
Nuget包安装信息
RabbitMQ.Client、Polly、Autofac.Extensions.DependencyInjection、Autofac.Extras.DynamicProxy
配置RabbitMQ节点信息
"RabbitMQ": {"Enabled": true,"Connection": "localhost","UserName": "guest","Password": "guest","Port": 5672,"RetryCount": 3
},
"EventBus": {"Enabled": true,"SubscriptionClientName": "Net6.Core"
}
通过连接工厂初始化RabbitMQ配置信息
public static class RabbitMQSetup
{public static void AddRabbitMQSetup(this IServiceCollection services){if (services == null) throw new ArgumentNullException(nameof(services));if (AppSettings.app(new string[] { "RabbitMQ", "Enabled" }).ObjToBool()){services.AddSingleton<IRabbitMQPersistentConnection>(sp =>{var logger = sp.GetRequiredService<ILogger<RabbitMQPersistentConnection>>();var factory = new ConnectionFactory(){HostName = AppSettings.app(new string[] { "RabbitMQ", "Connection" }),DispatchConsumersAsync = true,factory.UserName = AppSettings.app(new string[] { "RabbitMQ", "UserName" }),factory.Password = AppSettings.app(new string[] { "RabbitMQ", "Password" }),factory.Port = AppSettings.app(new string[] { "RabbitMQ", "Port" }).ObjToInt(),};int retryCount = AppSettings.app(new string[] { "RabbitMQ", "RetryCount" }).ObjToInt();return new RabbitMQPersistentConnection(factory, logger, retryCount);});}}
}
注入RabbitMQ连接配置服务
builder.Services.AddRabbitMQSetup();
新建IntegrationEvent类
/// <summary>
/// 事件模型
/// 基类
/// </summary>
public class IntegrationEvent
{public IntegrationEvent(){Id = Guid.NewGuid();CreationDate = DateTime.UtcNow;}[JsonConstructor]public IntegrationEvent(Guid id, DateTime createDate){Id = id;CreationDate = createDate;}[JsonProperty]public Guid Id { get; private set; }[JsonProperty]public DateTime CreationDate { get; private set; }
}
新建IIntegrationEventHandler接口
定义事件处理程序类
/// <summary>
/// 集成事件处理程序
/// 泛型接口
/// </summary>
/// <typeparam name="TIntegrationEvent"></typeparam>
public interface IIntegrationEventHandler<in TIntegrationEvent> : IIntegrationEventHandlerwhere TIntegrationEvent : IntegrationEvent
{Task Handle(TIntegrationEvent @event);
}/// <summary>
/// 集成事件处理程序
/// 基 接口
/// </summary>
public interface IIntegrationEventHandler
{
}
新建IDynamicIntegrationEventHandler接口
/// <summary>
/// 动态集成事件处理程序
/// 接口
/// </summary>
public interface IDynamicIntegrationEventHandler
{Task Handle(dynamic eventData);
}
新建IEventBus接口
/// <summary>
/// 事件总线
/// 接口
/// </summary>
public interface IEventBus
{/// <summary>/// 发布/// </summary>/// <param name="event">事件,参数类必须继承自IntegrationEvent类</param>void Publish(IntegrationEvent @event);//void Publish<TEvent>(TEvent @event) where TEvent : IntegrationEvent;/// <summary>/// 订阅/// </summary>/// <typeparam name="T">订阅的事件类型</typeparam>/// <typeparam name="TH">处理该事件的事件处理程序类型。它是泛型参数,必须实现 IIntegrationEventHandler<TEvent> 接口</typeparam>void Subscribe<T, TH>()where T : IntegrationEventwhere TH : IIntegrationEventHandler<T>;/// <summary>/// 取消订阅/// </summary>/// <typeparam name="T">订阅的事件类型</typeparam>/// <typeparam name="TH">处理该事件的事件处理程序类型。它是泛型参数,必须实现 IIntegrationEventHandler<TEvent> 接口</typeparam>void Unsubscribe<T, TH>()where TH : IIntegrationEventHandler<T>where T : IntegrationEvent;/// <summary>/// 动态订阅/// </summary>/// <typeparam name="TH">处理该事件的事件处理程序类型。它是泛型参数,必须实现 IIntegrationEventHandler<TEvent> 接口</typeparam>/// <param name="eventName"></param>void SubscribeDynamic<TH>(string eventName)where TH : IDynamicIntegrationEventHandler;/// <summary>/// 动态取消订阅/// </summary>/// <typeparam name="TH">处理该事件的事件处理程序类型。它是泛型参数,必须实现 IIntegrationEventHandler<TEvent> 接口</typeparam>/// <param name="eventName"></param>void UnsubscribeDynamic<TH>(string eventName)where TH : IDynamicIntegrationEventHandler;
}
新建IEventBusSubscriptionsManager接口
/// <summary>
/// 事件总线订阅管理器
/// 接口
/// </summary>
public interface IEventBusSubscriptionsManager
{bool IsEmpty { get; }event EventHandler<string> OnEventRemoved;void AddDynamicSubscription<TH>(string eventName)where TH : IDynamicIntegrationEventHandler;void AddSubscription<T, TH>()where T : IntegrationEventwhere TH : IIntegrationEventHandler<T>;void RemoveSubscription<T, TH>()where TH : IIntegrationEventHandler<T>where T : IntegrationEvent;void RemoveDynamicSubscription<TH>(string eventName)where TH : IDynamicIntegrationEventHandler;bool HasSubscriptionsForEvent<T>() where T : IntegrationEvent;bool HasSubscriptionsForEvent(string eventName);Type GetEventTypeByName(string eventName);void Clear();IEnumerable<SubscriptionInfo> GetHandlersForEvent<T>() where T : IntegrationEvent;IEnumerable<SubscriptionInfo> GetHandlersForEvent(string eventName);string GetEventKey<T>();
}
新建InMemoryEventBusSubscriptionsManager类
/// <summary>
/// 基于内存
/// 事件总线订阅管理器
/// 单例模式
/// </summary>
public partial class InMemoryEventBusSubscriptionsManager : IEventBusSubscriptionsManager
{private readonly Dictionary<string, List<SubscriptionInfo>> _handlers;private readonly List<Type> _eventTypes;public event EventHandler<string> OnEventRemoved;public InMemoryEventBusSubscriptionsManager(){_handlers = new Dictionary<string, List<SubscriptionInfo>>();_eventTypes = new List<Type>();}public bool IsEmpty => !_handlers.Keys.Any();public void Clear() => _handlers.Clear();/// <summary>/// 添加动态订阅/// </summary>/// <typeparam name="TH">约束:动态事件处理器接口</typeparam>/// <param name="eventName"></param>public void AddDynamicSubscription<TH>(string eventName)where TH : IDynamicIntegrationEventHandler{DoAddSubscription(typeof(TH), eventName, isDynamic: true);}/// <summary>/// 添加订阅/// </summary>/// <typeparam name="T">约束:事件</typeparam>/// <typeparam name="TH">约束:事件处理器接口<事件></typeparam>public void AddSubscription<T, TH>()where T : IntegrationEventwhere TH : IIntegrationEventHandler<T>{var eventName = GetEventKey<T>();DoAddSubscription(typeof(TH), eventName, isDynamic: false);if (!_eventTypes.Contains(typeof(T))){_eventTypes.Add(typeof(T));}}private void DoAddSubscription(Type handlerType, string eventName, bool isDynamic){if (!HasSubscriptionsForEvent(eventName)){_handlers.Add(eventName, new List<SubscriptionInfo>());}if (_handlers[eventName].Any(s => s.HandlerType == handlerType)){throw new ArgumentException($"Handler Type {handlerType.Name} already registered for '{eventName}'", nameof(handlerType));}if (isDynamic){_handlers[eventName].Add(SubscriptionInfo.Dynamic(handlerType));}else{_handlers[eventName].Add(SubscriptionInfo.Typed(handlerType));}}/// <summary>/// 移除动态订阅/// </summary>/// <typeparam name="TH"></typeparam>/// <param name="eventName"></param>public void RemoveDynamicSubscription<TH>(string eventName)where TH : IDynamicIntegrationEventHandler{var handlerToRemove = FindDynamicSubscriptionToRemove<TH>(eventName);DoRemoveHandler(eventName, handlerToRemove);}public void RemoveSubscription<T, TH>()where TH : IIntegrationEventHandler<T>where T : IntegrationEvent{var handlerToRemove = FindSubscriptionToRemove<T, TH>();var eventName = GetEventKey<T>();DoRemoveHandler(eventName, handlerToRemove);}private void DoRemoveHandler(string eventName, SubscriptionInfo subsToRemove){if (subsToRemove != null){_handlers[eventName].Remove(subsToRemove);if (!_handlers[eventName].Any()){_handlers.Remove(eventName);var eventType = _eventTypes.SingleOrDefault(e => e.Name == eventName);if (eventType != null){_eventTypes.Remove(eventType);}RaiseOnEventRemoved(eventName);}}}public IEnumerable<SubscriptionInfo> GetHandlersForEvent<T>() where T : IntegrationEvent{var key = GetEventKey<T>();return GetHandlersForEvent(key);}public IEnumerable<SubscriptionInfo> GetHandlersForEvent(string eventName) => _handlers[eventName];private void RaiseOnEventRemoved(string eventName){var handler = OnEventRemoved;handler?.Invoke(this, eventName);}private SubscriptionInfo FindDynamicSubscriptionToRemove<TH>(string eventName)where TH : IDynamicIntegrationEventHandler{return DoFindSubscriptionToRemove(eventName, typeof(TH));}/// <summary>/// 查询订阅并移除/// </summary>/// <typeparam name="T"></typeparam>/// <typeparam name="TH"></typeparam>/// <returns></returns>private SubscriptionInfo FindSubscriptionToRemove<T, TH>()where T : IntegrationEventwhere TH : IIntegrationEventHandler<T>{var eventName = GetEventKey<T>();return DoFindSubscriptionToRemove(eventName, typeof(TH));}private SubscriptionInfo DoFindSubscriptionToRemove(string eventName, Type handlerType){if (!HasSubscriptionsForEvent(eventName)){return null;}return _handlers[eventName].SingleOrDefault(s => s.HandlerType == handlerType);}public bool HasSubscriptionsForEvent<T>() where T : IntegrationEvent{var key = GetEventKey<T>();return HasSubscriptionsForEvent(key);}public bool HasSubscriptionsForEvent(string eventName) => _handlers.ContainsKey(eventName);public Type GetEventTypeByName(string eventName) => _eventTypes.SingleOrDefault(t => t.Name == eventName);public string GetEventKey<T>(){return typeof(T).Name;}
}
新建SubscriptionInfo订阅信息模型类
/// <summary>
/// 订阅信息模型
/// </summary>
public class SubscriptionInfo
{public bool IsDynamic { get; }public Type HandlerType { get; }private SubscriptionInfo(bool isDynamic, Type handlerType){IsDynamic = isDynamic;HandlerType = handlerType;}public static SubscriptionInfo Dynamic(Type handlerType){return new SubscriptionInfo(true, handlerType);}public static SubscriptionInfo Typed(Type handlerType){return new SubscriptionInfo(false, handlerType);}
}
新建IRabbitMQPersistentConnection持久化连接接口
/// <summary>
/// RabbitMQ持久连接
/// 接口
/// </summary>
public interface IRabbitMQPersistentConnection: IDisposable
{bool IsConnected { get; }bool TryConnect();IModel CreateModel();
}
新建RabbitMQPersistentConnection实现类
/// <summary>/// RabbitMQ持久连接/// </summary>public class RabbitMQPersistentConnection: IRabbitMQPersistentConnection{private readonly IConnectionFactory _connectionFactory;private readonly ILogger<RabbitMQPersistentConnection> _logger;private readonly int _retryCount;IConnection _connection;bool _disposed;object sync_root = new object();public RabbitMQPersistentConnection(IConnectionFactory connectionFactory, ILogger<RabbitMQPersistentConnection> logger,int retryCount = 5){_connectionFactory = connectionFactory ?? throw new ArgumentNullException(nameof(connectionFactory));_logger = logger ?? throw new ArgumentNullException(nameof(logger));_retryCount = retryCount;}/// <summary>/// 是否已连接/// </summary>public bool IsConnected{get{return _connection != null && _connection.IsOpen && !_disposed;}}/// <summary>/// 创建Model/// </summary>/// <returns></returns>public IModel CreateModel(){if (!IsConnected){throw new InvalidOperationException("No RabbitMQ connections are available to perform this action");}return _connection.CreateModel();}/// <summary>/// 释放/// </summary>public void Dispose(){if (_disposed) return;_disposed = true;try{_connection.Dispose();}catch (IOException ex){_logger.LogCritical(ex.ToString());}}/// <summary>/// 连接/// </summary>/// <returns></returns>public bool TryConnect(){_logger.LogInformation("RabbitMQ Client is trying to connect");lock (sync_root){var policy = RetryPolicy.Handle<SocketException>().Or<BrokerUnreachableException>().WaitAndRetry(_retryCount,retryAttempt =>TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>{_logger.LogWarning(ex, "RabbitMQ Client could not connect after {TimeOut}s ({ExceptionMessage})", $"{time.TotalSeconds:n1}", ex.Message);});policy.Execute(() =>{_connection = _connectionFactory.CreateConnection();});if (IsConnected){_connection.ConnectionShutdown += OnConnectionShutdown;_connection.CallbackException += OnCallbackException;_connection.ConnectionBlocked += OnConnectionBlocked;_logger.LogInformation("RabbitMQ Client acquired a persistent connection to '{HostName}' and is subscribed to failure events", _connection.Endpoint.HostName);return true;}else{_logger.LogCritical("FATAL ERROR: RabbitMQ connections could not be created and opened");return false;}}}/// <summary>/// 连接被阻断/// </summary>/// <param name="sender"></param>/// <param name="e"></param>private void OnConnectionBlocked(object sender, ConnectionBlockedEventArgs e){if (_disposed) return;_logger.LogWarning("A RabbitMQ connection is shutdown. Trying to re-connect...");TryConnect();}/// <summary>/// 连接出现异常/// </summary>/// <param name="sender"></param>/// <param name="e"></param>void OnCallbackException(object sender, CallbackExceptionEventArgs e){if (_disposed) return;_logger.LogWarning("A RabbitMQ connection throw exception. Trying to re-connect...");TryConnect();}/// <summary>/// 连接被关闭/// </summary>/// <param name="sender"></param>/// <param name="reason"></param>void OnConnectionShutdown(object sender, ShutdownEventArgs reason){if (_disposed) return;_logger.LogWarning("A RabbitMQ connection is on shutdown. Trying to re-connect...");TryConnect();}}
新建EventBusRabbitMQ类
实现消息的发布、订阅、取消订阅接口。
/// <summary>
/// 基于RabbitMQ的事件总线
/// </summary>
public class EventBusRabbitMQ : IEventBus, IDisposable
{const string BROKER_NAME = "core_event_bus";private readonly IRabbitMQPersistentConnection _persistentConnection;private readonly ILogger<EventBusRabbitMQ> _logger;private readonly IEventBusSubscriptionsManager _subsManager;private readonly ILifetimeScope _autofac;private readonly string AUTOFAC_SCOPE_NAME = "core_event_bus";private readonly int _retryCount;private IModel _consumerChannel;private string _queueName;/// <summary>/// RabbitMQ事件总线/// </summary>/// <param name="persistentConnection">RabbitMQ持久连接</param>/// <param name="logger">日志</param>/// <param name="autofac">autofac容器</param>/// <param name="subsManager">事件总线订阅管理器</param>/// <param name="queueName">队列名称</param>/// <param name="retryCount">重试次数</param>public EventBusRabbitMQ(IRabbitMQPersistentConnection persistentConnection, ILogger<EventBusRabbitMQ> logger,ILifetimeScope autofac, IEventBusSubscriptionsManager subsManager, string queueName = null, int retryCount = 5){_persistentConnection = persistentConnection ?? throw new ArgumentNullException(nameof(persistentConnection));_logger = logger ?? throw new ArgumentNullException(nameof(logger));_subsManager = subsManager ?? new InMemoryEventBusSubscriptionsManager();_queueName = queueName;_consumerChannel = CreateConsumerChannel();_autofac = autofac;_retryCount = retryCount;_subsManager.OnEventRemoved += SubsManager_OnEventRemoved;}/// <summary>/// 订阅管理器事件/// </summary>/// <param name="sender"></param>/// <param name="eventName"></param>private void SubsManager_OnEventRemoved(object sender, string eventName){if (!_persistentConnection.IsConnected){_persistentConnection.TryConnect();}using (var channel = _persistentConnection.CreateModel()){channel.QueueUnbind(queue: _queueName,exchange: BROKER_NAME,routingKey: eventName);if (_subsManager.IsEmpty){_queueName = string.Empty;_consumerChannel.Close();}}}/// <summary>/// 发布/// </summary>/// <param name="event">事件模型</param>public void Publish(IntegrationEvent @event){if (!_persistentConnection.IsConnected){_persistentConnection.TryConnect();}var policy = RetryPolicy.Handle<BrokerUnreachableException>().Or<SocketException>().WaitAndRetry(_retryCount, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)), (ex, time) =>{_logger.LogWarning(ex, "Could not publish event: {EventId} after {Timeout}s ({ExceptionMessage})", @event.Id, $"{time.TotalSeconds:n1}", ex.Message);});var eventName = @event.GetType().Name;_logger.LogTrace("Creating RabbitMQ channel to publish event: {EventId} ({EventName})", @event.Id, eventName);using (var channel = _persistentConnection.CreateModel()){_logger.LogTrace("Declaring RabbitMQ exchange to publish event: {EventId}", @event.Id);channel.ExchangeDeclare(exchange: BROKER_NAME, type: "direct");var message = JsonConvert.SerializeObject(@event);var body = Encoding.UTF8.GetBytes(message);policy.Execute(() =>{var properties = channel.CreateBasicProperties();properties.DeliveryMode = 2; // persistent_logger.LogTrace("Publishing event to RabbitMQ: {EventId}", @event.Id);channel.BasicPublish(exchange: BROKER_NAME,routingKey: eventName,mandatory: true,basicProperties: properties,body: body);});}}/// <summary>/// 订阅/// 动态/// </summary>/// <typeparam name="TH">事件处理器</typeparam>/// <param name="eventName">事件名</param>public void SubscribeDynamic<TH>(string eventName)where TH : IDynamicIntegrationEventHandler{_logger.LogInformation("Subscribing to dynamic event {EventName} with {EventHandler}", eventName, typeof(TH).GetGenericTypeName());DoInternalSubscription(eventName);_subsManager.AddDynamicSubscription<TH>(eventName);StartBasicConsume();}/// <summary>/// 订阅/// </summary>/// <typeparam name="T">约束:事件模型</typeparam>/// <typeparam name="TH">约束:事件处理器<事件模型></typeparam>public void Subscribe<T, TH>()where T : IntegrationEventwhere TH : IIntegrationEventHandler<T>{var eventName = _subsManager.GetEventKey<T>();DoInternalSubscription(eventName);_logger.LogInformation("Subscribing to event {EventName} with {EventHandler}", eventName, typeof(TH).GetGenericTypeName());ConsoleHelper.WriteSuccessLine($"Subscribing to event {eventName} with {typeof(TH).GetGenericTypeName()}");_subsManager.AddSubscription<T, TH>();StartBasicConsume();}private void DoInternalSubscription(string eventName){var containsKey = _subsManager.HasSubscriptionsForEvent(eventName);if (!containsKey){if (!_persistentConnection.IsConnected){_persistentConnection.TryConnect();}using (var channel = _persistentConnection.CreateModel()){channel.QueueBind(queue: _queueName,exchange: BROKER_NAME,routingKey: eventName);}}}/// <summary>/// 取消订阅/// </summary>/// <typeparam name="T"></typeparam>/// <typeparam name="TH"></typeparam>public void Unsubscribe<T, TH>()where T : IntegrationEventwhere TH : IIntegrationEventHandler<T>{var eventName = _subsManager.GetEventKey<T>();_logger.LogInformation("Unsubscribing from event {EventName}", eventName);_subsManager.RemoveSubscription<T, TH>();}public void UnsubscribeDynamic<TH>(string eventName)where TH : IDynamicIntegrationEventHandler{_subsManager.RemoveDynamicSubscription<TH>(eventName);}public void Dispose(){if (_consumerChannel != null){_consumerChannel.Dispose();}_subsManager.Clear();}/// <summary>/// 开始基本消费/// </summary>private void StartBasicConsume(){_logger.LogTrace("Starting RabbitMQ basic consume");if (_consumerChannel != null){var consumer = new AsyncEventingBasicConsumer(_consumerChannel);consumer.Received += Consumer_Received;_consumerChannel.BasicConsume(queue: _queueName,autoAck: false,consumer: consumer);}else{_logger.LogError("StartBasicConsume can't call on _consumerChannel == null");}}/// <summary>/// 消费者接受到/// </summary>/// <param name="sender"></param>/// <param name="eventArgs"></param>/// <returns></returns>private async Task Consumer_Received(object sender, BasicDeliverEventArgs eventArgs){var eventName = eventArgs.RoutingKey;var message = Encoding.UTF8.GetString(eventArgs.Body.Span);try{if (message.ToLowerInvariant().Contains("throw-fake-exception")){throw new InvalidOperationException($"Fake exception requested: \"{message}\"");}await ProcessEvent(eventName, message);}catch (Exception ex){_logger.LogWarning(ex, "----- ERROR Processing message \"{Message}\"", message);}// Even on exception we take the message off the queue.// in a REAL WORLD app this should be handled with a Dead Letter Exchange (DLX). // For more information see: https://www.rabbitmq.com/dlx.html_consumerChannel.BasicAck(eventArgs.DeliveryTag, multiple: false);}/// <summary>/// 创造消费通道/// </summary>/// <returns></returns>private IModel CreateConsumerChannel(){if (!_persistentConnection.IsConnected){_persistentConnection.TryConnect();}_logger.LogTrace("Creating RabbitMQ consumer channel");var channel = _persistentConnection.CreateModel();channel.ExchangeDeclare(exchange: BROKER_NAME,type: "direct");channel.QueueDeclare(queue: _queueName,durable: true,exclusive: false,autoDelete: false,arguments: null);channel.CallbackException += (sender, ea) =>{_logger.LogWarning(ea.Exception, "Recreating RabbitMQ consumer channel");_consumerChannel.Dispose();_consumerChannel = CreateConsumerChannel();StartBasicConsume();};return channel;}private async Task ProcessEvent(string eventName, string message){_logger.LogTrace("Processing RabbitMQ event: {EventName}", eventName);if (_subsManager.HasSubscriptionsForEvent(eventName)){using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME)){var subscriptions = _subsManager.GetHandlersForEvent(eventName);foreach (var subscription in subscriptions){if (subscription.IsDynamic){var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler;if (handler == null) continue;dynamic eventData = JObject.Parse(message);await Task.Yield();await handler.Handle(eventData);}else{var handler = scope.ResolveOptional(subscription.HandlerType);if (handler == null) continue;var eventType = _subsManager.GetEventTypeByName(eventName);var integrationEvent = JsonConvert.DeserializeObject(message, eventType);var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);await Task.Yield();await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] { integrationEvent });}}}}else{_logger.LogWarning("No subscription for RabbitMQ event: {EventName}", eventName);}}
}
新建事件处理程序实现类
继承IntegrationEvent基类
public class UserQueryIntegrationEvent : IntegrationEvent
{public string UserId { get; private set; }public UserQueryIntegrationEvent(string userid)=> UserId = userid;
}
public class UserQueryIntegrationEventHandler : IIntegrationEventHandler<UserQueryIntegrationEvent>
{private readonly IUserServices _userServices;private readonly ILogger<UserQueryIntegrationEventHandler> _logger;public UserQueryIntegrationEventHandler(IUserServices userServices,ILogger<UserQueryIntegrationEventHandler> logger){_userServices = userServices;_logger = logger ?? throw new ArgumentNullException(nameof(logger));}public async Task Handle(UserQueryIntegrationEvent @event){_logger.LogInformation("----- Handling integration event: {IntegrationEventId} at {AppName} - ({@IntegrationEvent})", @event.Id, "Core", @event);ConsoleHelper.WriteSuccessLine($"----- Handling integration event: {@event.Id} at Core - ({@event})");await _userServices.QueryById(@event.UserId.ToString());}}
依赖注入EventBus服务
/// <summary>
/// EventBus 事件总线服务
/// </summary>
public static class EventBusSetup
{public static void AddEventBusSetup(this IServiceCollection services){if (services == null) throw new ArgumentNullException(nameof(services));if (AppSettings.app(new string[] { "EventBus", "Enabled" }).ObjToBool()){var subscriptionClientName = AppSettings.app(new string[] { "EventBus", "SubscriptionClientName" });services.AddSingleton<IEventBusSubscriptionsManager, InMemoryEventBusSubscriptionsManager>();services.AddTransient<UserQueryIntegrationEventHandler>();if (AppSettings.app(new string[] { "RabbitMQ", "Enabled" }).ObjToBool()){services.AddSingleton<IEventBus, EventBusRabbitMQ>(sp =>{var rabbitMQPersistentConnection = sp.GetRequiredService<IRabbitMQPersistentConnection>();var iLifetimeScope = sp.GetRequiredService<ILifetimeScope>();var logger = sp.GetRequiredService<ILogger<EventBusRabbitMQ>>();var eventBusSubcriptionsManager = sp.GetRequiredService<IEventBusSubscriptionsManager>();var retryCount = 5;if (!string.IsNullOrEmpty(AppSettings.app(new string[] { "RabbitMQ", "RetryCount" }))){retryCount = int.Parse(AppSettings.app(new string[] { "RabbitMQ", "RetryCount" }));}return new EventBusRabbitMQ(rabbitMQPersistentConnection, logger, iLifetimeScope, eventBusSubcriptionsManager, subscriptionClientName, retryCount);});}}}
}
builder.Services.AddEventBusSetup();
创建EventBus后台任务
public class EventBusHostedService : IHostedService
{private readonly IServiceProvider _serviceProvider;private readonly ILogger<EventBusHostedService> _logger;public EventBusHostedService(IServiceProvider serviceProvider, ILogger<EventBusHostedService> logger){_serviceProvider = serviceProvider;_logger = logger;}public async Task StartAsync(CancellationToken cancellationToken){_logger.LogInformation("Start EventBus Service!");await DoWork();}private Task DoWork(){if (AppSettings.app(new string[] { "EventBus", "Enabled" }).ObjToBool()){var eventBus = _serviceProvider.GetRequiredService<IEventBus>();eventBus.Subscribe<BlogQueryIntegrationEvent, BlogQueryIntegrationEventHandler>();}return Task.CompletedTask;}public Task StopAsync(CancellationToken cancellationToken){_logger.LogInformation("Stop EventBus Service!");return Task.CompletedTask;}
}
builder.Services.AddHostedService<EventBusHostedService>()
配置host
builder.Host.UseServiceProviderFactory(new AutofacServiceProviderFactory())
测试
/// <summary>
/// 测试RabbitMQ事件总线
/// </summary>
/// <param name="_eventBus"></param>
/// <param name="userId"></param>
/// <returns></returns>
[HttpGet]
[AllowAnonymous]
public void EventBusTry([FromServices] IEventBus _eventBus, string userId = "1")
{var deletedEvent = new UserQueryIntegrationEvent(userId);_eventBus.Publish(deletedEvent);
}