关于使用grpc
,MediatR
/MassTransit
整合数十个工厂的同步程序
写在开头
我负责的模块下有十几个winform同步程序,他们之前是通过window 计划任务
运行的,我觉得维护起来很麻烦,后来使用hangfire
合并了。但是紧接着第二个问题来了,由于每个工厂都有自己的数据库和服务器,所以我要去这二十多个服务器上都部署一遍相同的服务。在过去的日子里,我一直是哪个工厂提出了问题,就去更新哪个工厂的任务调度服务。
你可以发现这里的问题,事实上所有的服务都有问题,但是我的精力有限,没办法一个一个的更新。
于是我之前一直想的是能否利用Jekins
做自动化部署呢,我只需要执行一遍脚本,所有服务器上的服务就都被更新了。不过工厂的服务器都在内网里,我们的代码仓库在外面,这个想法只能就此作罢。
最近突发奇想,假设有一个中心服务器,它是一个事件处理程序,而数十个工厂服务器上,原先的任务调度服务不再是执行具体的逻辑代码,而是向中心服务器发送一个事件。比如维护计划生成事件
,而处理这个事件的代码在中心服务器里。
这样的话,我只需要更新中心服务器的代码就好了,但是这里还有一个问题,也就是
可扩展性
,假如我要新增一个任务,那我不又要去每个工厂的服务器更新代码吗?
实现
MassTransit
把步子迈大点,同步程序可以把任务归类一下,比如每5min
,每1h
,每天8点
,把情况都概括一下,让我们不用再动工厂的服务器了。
上面的中心服务器听起来像什么?是不是有点像消息总线了,让我们来看看MassTransit
吧,在这里我使用了rabbitmq
。
//EMS.FactoryClient
builder.Services.AddMassTransit(x =>
{x.UsingRabbitMq((context,cfg) =>{//记得开启rabbitmq相关端口哦cfg.Host("ip", "/", h => {h.Username("admin");h.Password("123456");});cfg.ConfigureEndpoints(context);});
});//配置masstransitpublic class TaskService(Ibus bus){public async Task FiveMinTask(){await bus.Publish(new FiveMinEvent { FactoryCode = "xxxx" });//在这里发布事件}
}//EMS.Domain
public record FiveMinEvent
{public string FactoryCode{ get; set;}
}//5分钟事件(需要保证该事件在两个服务中的命名空间一致)//EMS.FactoryServer
builder.Services.AddMassTransit(x =>x.AddConsumer<FiveMinEventHandler>();......//与上同
});
public class FiveMinEventHandler:IConsumer<FiveMinEvent>
{//在这里消费事件public async Task Consume(ConsumeContext<FiveMinEvent> context){Console.WriteLine("FiveMinEventHandler:"+context.Message.FactoryCode);await Task.CompletedTask;}
}
我们已经完成了第一步,我前面提到每个工厂都有自己数据库,我传入
FactoryCode
的作用便是通过它生成对应的DbContext
.
让我们来实现一个简易的工厂模式:
public interface IDbcontextFactory{EquipContext CreateContext(string factory);}
public class DbcontextFactory(IConfiguration configuration) : IDbcontextFactory{private string connectionstring { get; set; }public EquipContext CreateContext(string factory){connectionstring = configuration[$"Factory:{factory}"];if (!string.IsNullOrEmpty(connectionstring)) return new EquipContext(connectionstring);throw new ApplicationException("错误!");}}
public class EquipContext(string conn) : DbContext{private readonly string connectionString = conn;protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder){optionsBuilder.UseSqlServer(connectionString);}
}
services.AddScoped<IDbcontextFactory, DbcontextFactory>();//别忘了注入服务//生成对应的context
await using var context = dbcontextFactory.CreateContext(context.Message.FactoryCode);
好像差不多了,但是其实这还藏着一个问题,我使用了rabbitmq
作为传输者,单个节点的它,一旦崩溃,我的系统就玩完了,你说我们可以搭建rabbitmq集群来保证其高可用性,可工厂清一色的windows server,且我们团队也没什么运维,我不想如此费心,怎么办呢?让我们看看本文的另外一个主角。
grpc
来看看微软官方文档对其的描述:
-
现代高性能轻量级 RPC 框架。
-
协定优先 API 开发,默认使用协议缓冲区,允许与语言无关的实现。
-
可用于多种语言的工具,以生成强类型服务器和客户端。
-
支持客户端、服务器和双向流式处理调用。
-
使用 Protobuf 二进制序列化减少对网络的使用。
先编写一个
Protobuf
文件syntax = "proto3";option csharp_namespace = "FactoryServer";package equipment;service Equipment {// 5鍒嗛挓rpc FiveMin (Request) returns (Reply);// 30鍒嗛挓rpc ThirtyMin (Request) returns (Reply);// 1灏忔椂rpc OneHour (Request) returns (Reply);// 7鐐归挓rpc SevenClock (Request) returns (Reply);// 8鐐归挓rpc EightClock (Request) returns (Reply);// 姣忔湀1鍙?rpc OneDay (Request) returns (Reply); }message Request {string factoryCode = 1; }message Reply {string message = 1; }
Install-Package Grpc.AspNetCore
<ItemGroup><Protobuf Include="Protos\equipment.proto" GrpcServices="Server" />
</ItemGroup>
接下来编写服务类
public class EquipmentService(ILogger<EquipmentService> logger):Equipment.EquipmentBase{public override Task<Reply> FiveMin(Request request, ServerCallContext context){return Task.FromResult(new Reply{Message = "FiveMin :"+request.FactoryCode});}}//Program.cs
builder.Services.AddGrpc();
var app = builder.Build();
app.MapGrpcService<EquipmentService>();
然后是客户端
客户端需要先安装三个包:
Install-Package Grpc.Net.ClientFactory
Install-Package Google.Protobuf
Install-Package Grpc.Tools
builder.Services.AddGrpcClient<Equipment.EquipmentClient>(o =>
{o.Address = new Uri("http://ip:port");
});
builder.Services.AddScoped<EquipmentService>();public class EquipmentService(Equipment.EquipmentClient client)
{public async Task<string> GetEquipment(){var request = new Request{ FactoryCode = "xxxx" };var reply = await client.FiveMinAsync(request);return reply.Message;}
}
到这里总算要结束了吗,是不是还忘记了什么,前面说到,我们通过同步频率进行了分组,那对应的服务端方法要调用好几个方法
MediatR
这是一个好东西,它可以:
- 解藕,提升代码的可维护性及可扩展性
- 遵循单一职责原则,每个处理程序只处理特定类型的请求
- 方便进行单元测试
- 管道,与
asp.net core
的中间件管道类似
让我们继续吧
builder.Services.AddMediatR(cfg => cfg.RegisterServicesFromAssembly(typeof(Program).Assembly));public record MaintenancePlanEvent( string FactoryCode) : INotification;
public record MaintenanceTaskEvent( string FactoryCode) : INotification;public class MaintenancePlanEventHandler(IDbcontextFactory factory, ILogger<MaintenancePlanEventHandler> logger) : INotificationHandler<MaintenancePlanEvent>{public async Task Handle(MaintenancePlanEvent notification, CancellationToken cancellationToken){using var context = factory.CreateContext(notification.FactoryCode);......//业务逻辑}}
回到服务类:
public class EquipmentService(ILogger<EquipmentService> loggerIMediator,IMediator mediator):Equipment.EquipmentBase{public override Task<Reply> FiveMin(Request request, ServerCallContext context){await mediator.Publish(new MaintenancePlanEvent(request.FactoryCode));await mediator.Publish(new MaintenanceTaskEvent(request.FactoryCode));........//其余的return Task.FromResult(new Reply{Message = "FiveMin :"+request.FactoryCode});}}
到此,总算是结束了。
写在最后
经过一番折腾,总算解决了当下的问题,但是前方还会出现什么呢?