关于使用grpc,MediatRMassTransit 整合数十个工厂的同步程序

news/2025/4/1 6:45:25/文章来源:https://www.cnblogs.com/ssz0312/p/18799802

关于使用grpc,MediatR/MassTransit 整合数十个工厂的同步程序

写在开头

我负责的模块下有十几个winform同步程序,他们之前是通过window 计划任务运行的,我觉得维护起来很麻烦,后来使用hangfire合并了。但是紧接着第二个问题来了,由于每个工厂都有自己的数据库和服务器,所以我要去这二十多个服务器上都部署一遍相同的服务。在过去的日子里,我一直是哪个工厂提出了问题,就去更新哪个工厂的任务调度服务。

你可以发现这里的问题,事实上所有的服务都有问题,但是我的精力有限,没办法一个一个的更新。

于是我之前一直想的是能否利用Jekins做自动化部署呢,我只需要执行一遍脚本,所有服务器上的服务就都被更新了。不过工厂的服务器都在内网里,我们的代码仓库在外面,这个想法只能就此作罢。

最近突发奇想,假设有一个中心服务器,它是一个事件处理程序,而数十个工厂服务器上,原先的任务调度服务不再是执行具体的逻辑代码,而是向中心服务器发送一个事件。比如维护计划生成事件,而处理这个事件的代码在中心服务器里。

这样的话,我只需要更新中心服务器的代码就好了,但是这里还有一个问题,也就是可扩展性,假如我要新增一个任务,那我不又要去每个工厂的服务器更新代码吗?

实现

MassTransit

把步子迈大点,同步程序可以把任务归类一下,比如每5min,每1h,每天8点,把情况都概括一下,让我们不用再动工厂的服务器了。

上面的中心服务器听起来像什么?是不是有点像消息总线了,让我们来看看MassTransit吧,在这里我使用了rabbitmq

//EMS.FactoryClient
builder.Services.AddMassTransit(x =>
{x.UsingRabbitMq((context,cfg) =>{//记得开启rabbitmq相关端口哦cfg.Host("ip", "/", h => {h.Username("admin");h.Password("123456");});cfg.ConfigureEndpoints(context);});
});//配置masstransitpublic class TaskService(Ibus bus){public async Task FiveMinTask(){await bus.Publish(new FiveMinEvent { FactoryCode = "xxxx" });//在这里发布事件} 
}//EMS.Domain
public record FiveMinEvent
{public string FactoryCode{ get; set;}
}//5分钟事件(需要保证该事件在两个服务中的命名空间一致)//EMS.FactoryServer
builder.Services.AddMassTransit(x =>x.AddConsumer<FiveMinEventHandler>();......//与上同
});
public class FiveMinEventHandler:IConsumer<FiveMinEvent>
{//在这里消费事件public async Task Consume(ConsumeContext<FiveMinEvent> context){Console.WriteLine("FiveMinEventHandler:"+context.Message.FactoryCode);await Task.CompletedTask;}
}

我们已经完成了第一步,我前面提到每个工厂都有自己数据库,我传入FactoryCode的作用便是通过它生成对应的DbContext.

让我们来实现一个简易的工厂模式:

public interface IDbcontextFactory{EquipContext CreateContext(string factory);}
public class DbcontextFactory(IConfiguration configuration) : IDbcontextFactory{private string connectionstring { get; set; }public EquipContext CreateContext(string factory){connectionstring = configuration[$"Factory:{factory}"];if (!string.IsNullOrEmpty(connectionstring)) return new EquipContext(connectionstring);throw new ApplicationException("错误!");}}
public class EquipContext(string conn) : DbContext{private readonly string connectionString = conn;protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder){optionsBuilder.UseSqlServer(connectionString);}
}
services.AddScoped<IDbcontextFactory, DbcontextFactory>();//别忘了注入服务//生成对应的context
await using var context = dbcontextFactory.CreateContext(context.Message.FactoryCode);

好像差不多了,但是其实这还藏着一个问题,我使用了rabbitmq作为传输者,单个节点的它,一旦崩溃,我的系统就玩完了,你说我们可以搭建rabbitmq集群来保证其高可用性,可工厂清一色的windows server,且我们团队也没什么运维,我不想如此费心,怎么办呢?让我们看看本文的另外一个主角。

grpc

来看看微软官方文档对其的描述:

  • 现代高性能轻量级 RPC 框架。

  • 协定优先 API 开发,默认使用协议缓冲区,允许与语言无关的实现。

  • 可用于多种语言的工具,以生成强类型服务器和客户端。

  • 支持客户端、服务器和双向流式处理调用。

  • 使用 Protobuf 二进制序列化减少对网络的使用。

    先编写一个Protobuf文件

    syntax = "proto3";option csharp_namespace = "FactoryServer";package equipment;service Equipment {// 5鍒嗛挓rpc FiveMin (Request) returns (Reply);// 30鍒嗛挓rpc ThirtyMin (Request) returns (Reply);// 1灏忔椂rpc OneHour (Request) returns (Reply);// 7鐐归挓rpc SevenClock (Request) returns (Reply);// 8鐐归挓rpc EightClock (Request) returns (Reply);// 姣忔湀1鍙?rpc OneDay (Request) returns (Reply);
    }message Request {string factoryCode = 1;
    }message Reply {string message = 1;
    }
    
    Install-Package Grpc.AspNetCore
    
<ItemGroup><Protobuf Include="Protos\equipment.proto" GrpcServices="Server" />
</ItemGroup>

接下来编写服务类

 public class EquipmentService(ILogger<EquipmentService> logger):Equipment.EquipmentBase{public override Task<Reply> FiveMin(Request request, ServerCallContext context){return Task.FromResult(new Reply{Message = "FiveMin :"+request.FactoryCode});}}//Program.cs
builder.Services.AddGrpc();
var app = builder.Build();
app.MapGrpcService<EquipmentService>();

然后是客户端

客户端需要先安装三个包:

Install-Package Grpc.Net.ClientFactory
Install-Package Google.Protobuf
Install-Package Grpc.Tools

builder.Services.AddGrpcClient<Equipment.EquipmentClient>(o =>
{o.Address = new Uri("http://ip:port");
});
builder.Services.AddScoped<EquipmentService>();public class EquipmentService(Equipment.EquipmentClient client)
{public async Task<string> GetEquipment(){var request = new Request{ FactoryCode = "xxxx" };var reply = await client.FiveMinAsync(request);return reply.Message;}
}

到这里总算要结束了吗,是不是还忘记了什么,前面说到,我们通过同步频率进行了分组,那对应的服务端方法要调用好几个方法

MediatR

这是一个好东西,它可以:

  1. 解藕,提升代码的可维护性及可扩展性
  2. 遵循单一职责原则,每个处理程序只处理特定类型的请求
  3. 方便进行单元测试
  4. 管道,与asp.net core的中间件管道类似

让我们继续吧

builder.Services.AddMediatR(cfg => cfg.RegisterServicesFromAssembly(typeof(Program).Assembly));public record MaintenancePlanEvent( string FactoryCode) : INotification;
public record MaintenanceTaskEvent( string FactoryCode) : INotification;public class MaintenancePlanEventHandler(IDbcontextFactory factory, ILogger<MaintenancePlanEventHandler> logger) : INotificationHandler<MaintenancePlanEvent>{public async Task Handle(MaintenancePlanEvent notification, CancellationToken cancellationToken){using var context = factory.CreateContext(notification.FactoryCode);......//业务逻辑}}

回到服务类:

 public class EquipmentService(ILogger<EquipmentService> loggerIMediator,IMediator mediator):Equipment.EquipmentBase{public override Task<Reply> FiveMin(Request request, ServerCallContext context){await mediator.Publish(new MaintenancePlanEvent(request.FactoryCode));await mediator.Publish(new MaintenanceTaskEvent(request.FactoryCode));........//其余的return Task.FromResult(new Reply{Message = "FiveMin :"+request.FactoryCode});}}

到此,总算是结束了。

写在最后

经过一番折腾,总算解决了当下的问题,但是前方还会出现什么呢?

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/907805.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[Vue] Vue模板编译原理解析 part 1

模板编译整体流程 首先我们看一下什么是编译? 所谓编译(Compile),指的是将语言 A 翻译成语言 B,语言 A 就被称之为源码(source code),语言 B 就被称之为目标代码(target code),这个事情谁来做?编译器来做。编译器你也不用想得那么神秘,就是一段程序而已。 完整的编…

[P] 结对项目:影蛇舞

项目 内容这个作业属于哪个课程 2025年春季软件工程(罗杰、任健) 这个作业的要求在哪里 [P] 结对项目:影蛇舞 我在这个课程的目标是 学习软件工程知识,通过团队协作开发一个具备实际应用价值的软件,从需求分析、设计、开发到测试和部署,完整经历软件开发生命周期,提高工…

0329-Never Give Up

前言 用LCD1602 去打印一段话吧。 就当作激励和安慰,毕竟,我今天被坏情绪传染了一下,有点点失落和悲伤。 参考 5-2 LCD1602调试器 效果 Never Give UpAttitude Is Everything代码 LCD1602.h #ifndef __LCD1602_H__ #define __LCD1602_H__//用户调用函数: void LCD_Init(); …

JavaScript 数据结构与算法 — 单向链表

链表(Linked List)是一种基本的数据结构,用于表示一组按顺序排列的元素。链表中的每个元素都与下一个元素连接,元素在内存中并不是连续的,而是通过指针来链接在一起。每个元素都包含两部分:自己的数据和指向下一个元素的指针。我们常说的链表指的是单向链表,第一个元素的…

Ethernaut通关(智能合约漏洞)(有缘更新)

SnowSword笑传之出错币Ethernaut通关 参考文章:文章 - Ethernaut闯关录(上) - 先知社区、 智能合约是什么?把智能合约想象成网络上的赛博自动售货机,每个人都可以写自己的智能合约,使用虚拟货币交易物品,并且网络区块链中的所有节点都在为你的交易记账不怕商家提桶跑路……

QML基本组件 滑动条 Slider

描述 Slider通过手柄沿轨迹滑动来设置数值。 Qt帮助文档搜索 “slider” 获取详细信息。属性 from : real to : real value : real orientation : enumeration stepsize : real touchDragThreshold : qreal信号 onValueChange {}import QtQuick import QtQuick.ControlsWindow …

UE5--006--小结(一)

1. Input2. AI Enmy3.Save Game

阶段升级,zhitan-ems开源能源管理系统--集成建筑能耗支路和分项功能

升级介绍 自从春节上班后开源以来,zhitan-ems收到了大家很多的赞誉和认可,很多朋友也提出了中肯的意见。感谢大家。 很多朋友的建议里提到建筑能耗功能,依据大家意见,我们加班加点实现了简单的建筑能耗功能。如下图: 另外打一波广告,欢迎大家star 项目介绍 通过物联网技…

Static Timing Analysis Basics

Preface This note only introduce the essential concepts about Static Timing Analysis, which not contains:Async, i.e. remove, recover Timing conceptions, i.e. false path, multi cycle path etc. Advance timing domain knowledgePOCV, MCMM etc.什么是 STA 由于时钟…

深圳大学的一些简单题

A打表,发现是这样的东西:然后规律很显然,相邻的两个数,一组在左边,另一组在右边,依次循环,偶数的时候是 \(23\) 开头,奇数的时候是 \(12\) 开头,再处理一下 \(1\) 和 \(n\) 就可以,比较简单的分讨 显然规律不止一个点击查看代码 #include <bits/stdc++.h> usin…

AI可解释性 I | 对抗样本(Adversarial Sample)论文导读(持续更新)

本文作为AI可解释性系列的第一部分,旨在以汉语整理并阅读对抗攻击(Adversarial Attack)相关的论文,并持续更新。与此同时,AI可解释性系列的第二部分:归因方法(Attribution)也即将上线,敬请期待。AI可解释性 I | 对抗样本(Adversarial Sample)论文导读(持续更新) 导…

day:33 jmeter性能测试——压力测试

一.单接口性能测试(同一用户进行压力测试) POST http://49.233.201.254:8080/cms/manage/loginJump.do POST data: userAccount=admin&loginPwd=123456(1)在线程中修改虚拟用户数线程组 线程数:虚拟用户数。一个虚拟用户占用一个进程或线程。设置多少虚拟用户数在这里…