进程间通信组件ZeroMQ详解

news/2025/1/10 21:30:42/文章来源:https://www.cnblogs.com/hsiang/p/18622806

在一些复杂的项目中,往往会由不同功能的程序组成,且在程序运行期间,各个程序还需要进行互相通信,实现进程间通信的方式有很多种,最常用的就是通过消息中间件,比如RabbitMQ,Kafka,以及ZeroMQ等,而RabbitMQ和Kafka这两款中间件往往都需要独立安装步骤才能使用,ZeroMQ却不需要独立安装部署,而是作为动态库直接在程序中引用即可。今天以一个简单的小例子,简述ZeroMQ的常见用法,仅供学习分享使用,如有不足之处,还请指正。

ZeroMQ (又被称为 ØMQ, 0MQ, or zmq),虽然看起来像是可嵌入的网络组件,实际上却是一款并发框架。ZeroMQ作为一款开源通用消息组件,通过Socket可以将原子消息通过不同协议(进程内,进程间,TCP和广播等)进行传输。基于ZeroMQ,可以由多种模式进行选择,如:fan-out,发布-订阅,任务分发,请求-应答等,并且支持1-N,N-N等多端通信。对于C#开发人员,ZeroMQ有两种方式可供选择,1. NetMQ,提供一个端口给C#;2. clrzmq4通过C#绑定到libzmq。而NetMQ正是ZeroMQ推荐的使用方式

ZeroMQ通信模式

 

ZeroMQ提供了多种通信模式,主要有以下几种:

  • 请求应答(Request-Response)模式,此模式是ZeroMQ所有通信方式中最简单的一种模式,当客户端发出请求时,期望得到应答,且必须得到应答,才算一个完整的通信。
  • 发布订阅(Publish-Subscriber)模式,此模式发送者并不直接发送消息给接收者,而是将要发送的消息进行分类,接收者根据分类只接收自己感兴趣的消息,这就是发布订阅模式。这里提及的消息分类,通常被称为主题(topic)或过滤器(filter)。ZeroMQ通过多种信息来表达主题内容,以字节数组或者字符串的形式表示。
  • 生产者-消费者(Push-Pull)模式,此模式是一个消息分发机制,主要用于任务并发和并行处理场景,正常情况下,一个或者多个生产者发送消息,而一个或多个消费者接收并处理这些消息,这种模式特别适合于需要高效任务分发的场景,如分布式计算,大数据处理等。
  • 路由经销商(Router-Dealer)模式,此种模式是请求回复模式的一种升级,当Router收到消息的时候,会自动在消息的前面添加一帧,用来识别发送端的地址。当发送一个消息的时候,需要先发送一帧对端的地址,然后再发送消息,如果目的地址指向的对端不存在了,这个消息就会被丢弃。对端的地址默认情况下由ZMQ来产生一个唯一标识UUID。DEALER可以任意读写,不需要额外的地址帧,当有多个对端的时候,循环给单个对端发送消息。(注意:不是群发消息,与PUB不同)。
  • 多发布订阅(XPub-XSub),通常情况下,发布订阅模式适用于一个发布者,多个订阅者的场景。如果需要多个消息发布者,那XPub-XSub模式将会比较适用。

本文主要讲解请求应答模式和发布订阅模式,其他通信模式,如果感兴趣可以参考官方文档。

请求应答

请求应答(Request-Response),此模式是ZeroMQ所有通信方式中最简单的一种模式,当客户端发出请求时,期望得到应答,且必须得到应答,才算一个完整的通信。请求应答模式是同步阻塞模式,如果发送消息顺序错误,会抛出异常。正确的请求应答顺序如下:

  1. 请求端(RequestSocket)发送一个消息
  2. 响应端(ResponseSocket)阅读请求消息
  3. 响应端(ResponseSocket)发送响应信息
  4. 请求端(RequestSocket)接收响应端(ResponseSocket)发送的信息。

请求应答模式,主要由RequestSocket和ResponseSocket组成,实现消息的请求和应答。

请求端发送消息之前,需要先进行连接Connect,然后才能发送消息。示例代码如下所示:

public class ZeroMQRequest:IDisposable
{public Action<string> Received;public Action<string> Sended;private string url = string.Empty;private RequestSocket request;public ZeroMQRequest(string url){this.request = new RequestSocket();this.url = url;}public void Connect(){request.Connect(this.url);}public void BeginReceive(){string msg = this.request.ReceiveFrameString();Received?.Invoke(msg);}public void SendMsg(string msg){this.request.SendFrame(msg);if (Sended != null){Sended.Invoke(msg);}}public void Disconnect(){request.Disconnect(this.url);}public void Dispose(){request.Close();request.Dispose();}
}

响应端接收消息之前,需要先进行绑定(Bind)到对应的网络端口,然后才能接收消息。示例代码如下所示:

public class ZeroMQResponse:IDisposable
{public Action<string> Received;public Action<string> Sended;private string url = string.Empty;private ResponseSocket response;public ZeroMQResponse(string url){this.url = url;this.response = new ResponseSocket();this.response.Bind(this.url);}public void BeginReceive(){Task.Run(() =>{while (true){string msg = this.response.ReceiveFrameString();Received?.Invoke(msg);//收到回复Send("Ok");}});}public void Send(string msg){this.response.SendFrame(msg);if (Sended != null){Sended.Invoke(msg);}}public void Dispose(){this.response.Dispose();}
}

上述代码是将ReuqestSocket和ResponseSocket进行封装,并通过委托Action公开了接收和发送后响应接口,在使用时进行调用即可。

请求应答模式示例截图如下所示:

由于请求应答模式是阻塞模式,如果没有发送就调用接收方法,或连续调用接收方法,或连续发送(发送没有响应就再次发送),则会抛出异常。

连续两次发送,异常信息如下所示:

连续两次接收,异常信息如下所示:

发布订阅

发布订阅模式,将要发送的信息按照主题(Topic)进行分类,哪个接收端订阅了这个主题,就接收对应的消息,而不是直接发发送给接收者,这样有助于对消息进行分类处理。所以发布订阅模式并非阻塞模式,也不是一对一的请求响应,而是按需分类,异步响应模式。此模式主要有PublisherSocket和SubscriberSocket两个类,分别用于处理消息的发布和订阅。正确的发布订阅顺序,如下所示:

  1. 发送端:定义PubliserSocket对象,并绑定(Bind)到指定端口。然后发送主题和消息。
  2. 接收端:定义SubscriberSocket对象,连接到指定端口,订阅主题,接收指定主题的消息。

消息发布类(PublisherSocket),在消息发送之前,首先绑定一个端口,然后才能发送主题和消息,示例代码如下所示:

public class ZeroMQPublisher : IDisposable
{private string url=string.Empty;private PublisherSocket publisher;public Action<string> Sended;public ZeroMQPublisher(string url){this.url = url;this.publisher = new PublisherSocket();this.publisher.Bind(url);}public void Send(string topic,string msg){this.publisher.SendMoreFrame(topic);this.publisher.SendFrame(msg);if(Sended != null){Sended.Invoke($"send msg,topic:{topic},msg:{msg},time is {DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}");}}public void Dispose(){this.publisher.Close();this.publisher.Dispose();}
}

消息订阅类(SubscriberSocket),在消息接收之前,首先连接端口,订阅主题(Subscribe方法),然后才能进行消息的接收,示例代码如下所示:

public class ZeroMQSubscriber : IDisposable
{private string url=string.Empty;private SubscriberSocket subscriber;public Action<string> Received;private bool isRunning = false;public ZeroMQSubscriber(string url){this.url = url;this.subscriber = new SubscriberSocket();this.subscriber.Connect(url);this.subscriber.Subscribe(string.Empty);this.isRunning = true;}public void BeginReceive(){Task.Run(() =>{while(isRunning){var topic = this.subscriber.ReceiveFrameString();var msg = this.subscriber.ReceiveFrameString();if(Received != null){Received.Invoke($"received msg,topic:{topic},msg:{msg},time is {DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss")}");}}});}public void DisConnect(){isRunning = false;this.subscriber.Disconnect(this.url);}public void Dispose(){this.isRunning=false;this.subscriber.Close();this.subscriber?.Dispose();}
}

注意,发布订阅模式是单向触发的,即消息发布者,不可以接收消息;消息接收者,也不可以发布消息。接收端在调用Subscribe方法时,如果主题为空,则表示可以订阅任何主题。

发布订阅模式示例截图如下所示:

源码下载

关注老码识途公众号,回复关键字ZeroMQ,即可获取示例源码,如下图所示:

以上就是《进程间通信组件库ZeroMQ详解》的全部内容。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/861297.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Mongodb安装步骤 (.msi安装方式)

我之前发的Mongodb安装步骤 ,被人建议使用 .msi安装方式 所以重新发一版 Mongodb安装步骤 (.msi安装方式) 一、首先下载安装程序 下载链接 Try MongoDB Community Edition | MongoDB 选择 .msi 二、安装 1、双击.msi 2、next: 3、勾选接受,next: 4、complete是默认安装…

[Windows] 数据恢复软件R-Studio 8.14.179623

R-Studio是一个功能强大、节省成本的反删除和数据恢复软件系列。它采用独特的数据恢复新技术,为恢复FAT12/16/32、NTFS、NTFS5(由 Windows 2000/XP/2003/Vista/Windows 8/Windows 10创建或更新)、Ext2FS/Ext3FS(OSX LINUX 文件系统)以及 UFS1/UFS2(FreeBSD/OpenBSD/NetBS…

期末考试

每日总结管理系统: 此软件的主要用户包括学生、小组长、教师;各个用户主要功能为: (1)学生:写日报,修改日报,浏览日报、查询个人日报,查看站内消息。 (2)小组长:日报形式审核,发表情况统计,日报消息,查询日报。 (3)教师:日报评分,发表情况统计,日报消息,查…

【转载】什么是Banner以及测试时需要注意的点

设计中的banner图和测试中的banner图大家好,我是莫宁。相信很多新手小白在近几年也有听说过“banner”吧,是不是很疑惑。反正莫宁刚入门的时候是对这个词很陌生,不知道什么。今天就来为各位小伙伴解答这个疑问吧!什么是Banner?大家都知道“banner”翻译过来是横幅的意思,…

什么是Banner以及测试时需要注意的点

设计中的banner图和测试中的banner图大家好,我是莫宁。相信很多新手小白在近几年也有听说过“banner”吧,是不是很疑惑。反正莫宁刚入门的时候是对这个词很陌生,不知道什么。今天就来为各位小伙伴解答这个疑问吧!什么是Banner?大家都知道“banner”翻译过来是横幅的意思,…

CDS标准视图:功能位置可用标签 I_FUNCNLLOCALTERNATIVELABEL

视图名称:功能位置可用标签 I_FUNCNLLOCALTERNATIVELABEL 视图类型:基础 视图代码:点击查看代码 @EndUserText.label: Functional Location Alternative Labels @VDM.viewType: #BASIC @AccessControl.authorizationCheck: #CHECK @AbapCatalog.sqlViewName: IFUNCTLOCALTLB…

大学微积分 AB 第六单元-4:变革的整合与积累(微分方程简介、可分离方程简介)

微积分的关系 微积分的基本定理将微分与积分联系起来,表明在某种意义上,微分和积分是互为反操作的。具体而言:若你首先对一个函数进行积分(求其原函数),然后对这个原函数进行微分,那么你会得到最初的函数。 反之亦然,若你对一个函数进行微分然后进行积分,你将得到相同…

windows如何安装JDK8?附安装包

前言 大家好,我是小徐啊。我们在开发Java的时候,第一步就是需要安装JDK,今天,小徐来介绍下Java开发中,最流行的JDK8的安装方式。文末附获取方式。 如何安装JDK8 首先,双击JDK8的安装包,开始安装。然后,会弹出Java SE8的一些说明,查看一下,然后点击下一步按钮即可。然…

最新扣子(Coze)实战案例:小红书爆款小新歪理漫画,批量处理节点的使用详细讲解,手把手教学,完全免费教程

今天通过一个小红书爆款漫画《小新歪理》来为大家讲解Coze中批处理节点的使用。 先看生成后的效果:接下来,话不多说,斜杠君用最简单的方式教给大家。 大家可以关注收藏,以免之后找不到,而且也不会错过我后面的教程。 网页链接​mp.weixin.qq.com/s/74WlVI7nCBirDQfMEynQJQ…

北京某新能源汽车生产及办公网络综合监控项目

北京智和信通监控运维方案,基于核心产品智和网管平台SugarNMS,全面覆盖并管理用户生产与办公环境中的各类网络设备、安全设施、服务器及主机等信息技术基础设施,并成为提升企业运营效率、保障生产线稳定运行的重要工具 北京某新能源汽车是某世界500强汽车集团旗下的新…

如何升级云服务器操作系统而不丢失现有数据?

问题描述: 随着业务的发展和技术的进步,用户希望能够对其云服务器的操作系统进行升级,以享受更高版本带来的新特性和性能改进。然而,担心在升级过程中会导致原有应用程序和服务的数据丢失。请问在不影响业务的前提下,怎样安全地完成操作系统升级? 解决方案: 当计划对云服…

前端必备基础系列(五)V8引擎和内存管理

浏览器的内核主要是由两部分组成的,以webkit为例: WebCore:负责HTML解析、布局、渲染等相关工作; JavaScriptCore:解析、执行JavaScript代码; 常见的JavaScript引擎:V8 是Chrome浏览器和Node.js的JavaScript引擎 JavaScriptCore:是Webkit浏览器引擎的一部分,主要用于A…