[转]ZeroMQ用法说明及C++示例

news/2025/1/23 13:11:12/文章来源:https://www.cnblogs.com/lyggqm/p/18291822

原文链接:https://blog.csdn.net/qq_40344790/article/details/130865273

 

ZMQ介绍

官网:https://zeromq.org/

Github:https://github.com/zeromq/libzmq

ZMQ(ZeroMQ)是一种高性能的异步消息传递库,它可以在不同的进程和机器之间进行消息传递。它提供了多种传输协议、通信模式和编程语言支持,并且非常易于使用。

ZMQ 的核心思想是将网络通信抽象出来成为 socket 概念,使用不同类型的 socket 可以实现不同的消息传递模式,例如请求-应答模式、发布-订阅模式、推送-拉取模式等。ZMQ 提供了 TCP、IPC、inproc 等多种传输协议,可以根据需要选择合适的协议。

 

常用的ZeroMQ URL格式:

使用TCP协议          TCP: "tcp://<address>:<port>"
进程内通信         in-process: "inproc://<name>"
 进程间通信 (Windows系统)   inter-process: "ipc://<path>" (Unix系统) 或 "ipc://<name>"
多播: (使用PGM协议)     "epgm://<address>:<port>"

多播: (使用UDP协议)      "epub://<address>:<port>" 

 

 

消息模式及示例

 

请求-应答模式

server.cpp

 1 #include <zmq.hpp>
 2 #include <iostream>
 3 
 4 int main() {
 5     // 创建上下文和套接字
 6     zmq::context_t context(1);
 7     zmq::socket_t socket(context, zmq::socket_type::rep);
 8 
 9     // 绑定到指定地址
10     socket.bind("tcp://*:5555");
11 
12     while (true) {
13         // 接收请求
14         zmq::message_t request;
15         socket.recv(request, zmq::recv_flags::none);
16 
17         // 打印请求内容
18         std::cout << "Received request: " << std::string(static_cast<char*>(request.data()), request.size()) << std::endl;
19 
20         // 发送响应
21         zmq::message_t response(5);
22         memcpy(response.data(), "World", 5);
23         socket.send(response, zmq::send_flags::none);
24     }
25 
26     return 0;
27 }

client.cpp

 1 #include <zmq.hpp>
 2 #include <iostream>
 3 
 4 int main() {
 5     // 创建上下文和套接字
 6     zmq::context_t context(1);
 7     zmq::socket_t socket(context, zmq::socket_type::req);
 8 
 9     // 连接到服务端地址
10     socket.connect("tcp://localhost:5555");
11 
12     // 发送请求
13     zmq::message_t request(5);
14     memcpy(request.data(), "Hello", 5);
15     socket.send(request, zmq::send_flags::none);
16 
17     // 接收响应
18     zmq::message_t response;
19     socket.recv(response, zmq::recv_flags::none);
20 
21     // 打印响应
22     std::cout << "Received response: " << std::string(static_cast<char*>(response.data()), response.size()) << std::endl;
23 
24     return 0;
25 }

发布-订阅模式

publisher.cpp

 1 #include <zmq.hpp>
 2 #include <string>
 3 #include <iostream>
 4 #include <unistd.h>
 5 
 6 int main() {
 7     // 创建上下文和套接字
 8     zmq::context_t context(1);
 9     zmq::socket_t socket(context, zmq::socket_type::pub);
10 
11     // 绑定到指定地址
12     socket.bind("tcp://*:5555");
13 
14     // 发布消息
15     int count = 0;
16     while (true) {
17         std::string topic = "Topic";
18         std::string message = "Message " + std::to_string(count);
19 
20         // 发布主题和消息
21         zmq::message_t topicMsg(topic.size());
22         memcpy(topicMsg.data(), topic.data(), topic.size());
23         socket.send(topicMsg, zmq::send_flags::sndmore);
24 
25         zmq::message_t messageMsg(message.size());
26         memcpy(messageMsg.data(), message.data(), message.size());
27         socket.send(messageMsg, zmq::send_flags::none);
28 
29         std::cout << "Published: " << topic << " - " << message << std::endl;
30 
31         count++;
32         sleep(1); // 每秒发布一条消息
33     }
34 
35     return 0;
36 }

subscriber.cpp

 1 #include <zmq.hpp>
 2 #include <string>
 3 #include <iostream>
 4 
 5 int main() {
 6     // 创建上下文和套接字
 7     zmq::context_t context(1);
 8     zmq::socket_t socket(context, zmq::socket_type::sub);
 9 
10     // 连接到发布者地址
11     socket.connect("tcp://localhost:5555");
12 
13     // 订阅所有主题
14     socket.setsockopt(ZMQ_SUBSCRIBE, "", 0);
15 
16     // 接收并处理消息
17     while (true) {
18         // 接收主题
19         zmq::message_t topicMsg;
20         socket.recv(topicMsg, zmq::recv_flags::none);
21         std::string topic(static_cast<char*>(topicMsg.data()), topicMsg.size());
22 
23         // 接收消息
24         zmq::message_t messageMsg;
25         socket.recv(messageMsg, zmq::recv_flags::none);
26         std::string message(static_cast<char*>(messageMsg.data()), messageMsg.size());
27 
28         std::cout << "Received: " << topic << " - " << message << std::endl;
29     }
30 
31     return 0;
32 }

推送-拉取模式

pusher.cpp

 1 #include <zmq.hpp>
 2 #include <string>
 3 #include <iostream>
 4 #include <unistd.h>
 5 
 6 int main() {
 7     // 创建上下文和套接字
 8     zmq::context_t context(1);
 9     zmq::socket_t socket(context, zmq::socket_type::push);
10 
11     // 绑定到指定地址
12     socket.bind("tcp://*:5555");
13 
14     // 发送消息
15     int count = 0;
16     while (true) {
17         std::string message = "Message " + std::to_string(count);
18 
19         // 发送消息
20         zmq::message_t messageMsg(message.size());
21         memcpy(messageMsg.data(), message.data(), message.size());
22         socket.send(messageMsg, zmq::send_flags::none);
23 
24         std::cout << "Pushed: " << message << std::endl;
25 
26         count++;
27         sleep(1); // 每秒推送一条消息
28     }
29 
30     return 0;
31 }

puller.cpp

 1 #include <zmq.hpp>
 2 #include <string>
 3 #include <iostream>
 4 
 5 int main() {
 6     // 创建上下文和套接字
 7     zmq::context_t context(1);
 8     zmq::socket_t socket(context, zmq::socket_type::pull);
 9 
10     // 连接到推送者地址
11     socket.connect("tcp://localhost:5555");
12 
13     // 接收消息
14     while (true) {
15         zmq::message_t messageMsg;
16         socket.recv(messageMsg, zmq::recv_flags::none);
17         std::string message(static_cast<char*>(messageMsg.data()), messageMsg.size());
18 
19         std::cout << "Pulled: " << message << std::endl;
20     }
21 
22     return 0;
23 }

进程间通信示例

sender.cpp

 1 #include <zmq.hpp>
 2 #include <string>
 3 #include <iostream>
 4 
 5 int main() {
 6     // 创建上下文和套接字
 7     zmq::context_t context(1);
 8     zmq::socket_t socket(context, zmq::socket_type::push);
 9 
10     // 连接到接收者的地址
11     socket.connect("ipc:///tmp/zmq_ipc_example");
12 
13     // 发送消息
14     std::string message = "Hello from sender!";
15     zmq::message_t messageMsg(message.size());
16     memcpy(messageMsg.data(), message.data(), message.size());
17     socket.send(messageMsg, zmq::send_flags::none);
18 
19     return 0;
20 }

receiver.cpp

 1 #include <zmq.hpp>
 2 #include <string>
 3 #include <iostream>
 4 
 5 int main() {
 6     // 创建上下文和套接字
 7     zmq::context_t context(1);
 8     zmq::socket_t socket(context, zmq::socket_type::pull);
 9 
10     // 绑定到指定地址
11     socket.bind("ipc:///tmp/zmq_ipc_example");
12 
13     // 接收消息
14     zmq::message_t messageMsg;
15     socket.recv(messageMsg, zmq::recv_flags::none);
16     std::string message(static_cast<char*>(messageMsg.data()), messageMsg.size());
17 
18     std::cout << "Received message: " << message << std::endl;
19 
20     return 0;
21 }

 

Router-Dealer消息路由

Router 模式是 ZeroMQ 中的一种复杂通信模式,用于创建灵活的消息路由系统。在 Router 模式下,ROUTER套接字可以接收来自多个客户端的请求,并将这些请求分发给多个工作线程或服务DEALER套接字。

Router-Dealer 通信模式可以用于实现负载均衡、消息路由和复杂的请求-响应模式,非常适合需要多个客户端和多个服务端进行交互的场景。

server.cpp

 1 #include <zmq.hpp>
 2 #include <iostream>
 3 #include <string>
 4 
 5 int main() {
 6     zmq::context_t context(1);
 7     zmq::socket_t router(context, ZMQ_ROUTER);
 8 
 9     // 绑定到端口 5555
10     router.bind("tcp://*:5555");
11 
12     while (true) {
13         zmq::message_t identity;
14         zmq::message_t message;
15 
16         // 接收来自 Dealer 的身份和消息
17         router.recv(&identity);
18         router.recv(&message);
19 
20         std::string identity_str = std::string(static_cast<char*>(identity.data()), identity.size());
21         std::string message_str = std::string(static_cast<char*>(message.data()), message.size());
22 
23         std::cout << "Received message from " << identity_str << ": " << message_str << std::endl;
24 
25         // 回复消息给 Dealer
26         std::string reply_message = "Hello from Router";
27         zmq::message_t reply(reply_message.size());
28         memcpy(reply.data(), reply_message.c_str(), reply_message.size());
29 
30         router.send(identity, ZMQ_SNDMORE);
31         router.send(reply);
32     }
33 
34     return 0;
35 }

client.cpp

 1 // g++ -o client client.cpp -lzmq -lpthread
 2 #include <zmq.hpp>
 3 #include <iostream>
 4 #include <string>
 5 #include <thread>
 6 
 7 void client_task() {
 8     zmq::context_t context(1);
 9     zmq::socket_t dealer(context, ZMQ_DEALER);
10 
11     // 连接到 Router
12     dealer.connect("tcp://localhost:5555");
13 
14     std::string identity = "Client1";
15 
16     // 设置 Dealer 的身份
17     dealer.setsockopt(ZMQ_IDENTITY, identity.c_str(), identity.size());
18 
19     // 发送消息给 Router
20     std::string request_message = "Hello from Dealer";
21     zmq::message_t request(request_message.size());
22     memcpy(request.data(), request_message.c_str(), request_message.size());
23 
24     dealer.send(request);
25 
26     // 接收 Router 的回复
27     zmq::message_t reply;
28     dealer.recv(&reply);
29 
30     std::string reply_str = std::string(static_cast<char*>(reply.data()), reply.size());
31 
32     std::cout << "Received reply from Router: " << reply_str << std::endl;
33 }
34 
35 int main() {
36     std::thread client_thread(client_task);
37     client_thread.join();
38 
39     return 0;
40 }

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/741148.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Solidity:assembly

在Solidity中,assembly是一个内嵌的低级语言,它允许开发者直接编写EVM(以太坊虚拟机)字节码。这种能力使得开发者可以更精细地控制智能合约的行为,并且在某些情况下可以提高性能和减少gas费用。然而,使用assembly也增加了代码的复杂性和出错的可能性,因此应谨慎使用。 为…

K8s 驱逐场景以及规避方案

Pod 驱逐场景总结从一个 SRE 角度看, Pod 驱逐分为两种情况:较安全驱逐 & 提高稳定性的良性驱逐API 发起驱逐,典型案例:kubectl drain Node Not Ready 时,Controller Manager 发起的驱逐有风险的驱逐节点压力驱逐节点磁盘空间不足、内存不足 或 Pid 不足, kubelet 发…

[二、状态管理]1状态管理概述

在前文的描述中,我们构建的页面多为静态界面。如果希望构建一个动态的、有交互的界面,就需要引入“状态”的概念。 图1 效果图上面的示例中,用户与应用程序的交互触发了文本状态变更,状态变更引起了UI渲染,UI从“Hello World”变更为“Hello ArkUI”。 在声明式UI编程框架…

lazarus 项目用到的控件

官网 https://www.lazarus-ide.org/ 基本上都是原生插件,能同时满足WINDOWS和Linux下开发 用到的控件

设置npm的registry几种方法

https://www.cnblogs.com/luludehuhuan/p/8017014.html相信坚持的力量,日复一日的习惯.

Etcd 高可用故障演练

目的 本次演练旨在测试 Kubernetes 的 etcd 高可用性,检验是否能够在其中一个 etcd 节点发生故障的情况下,其他 etcd 节点能够接管其工作,确保集群仍能正常运行。 集群架构演练场景 在一个三节点的 Kubernetes 集群中,我们将模拟其中一个 etcd 节点的故障,观察剩余的 etcd…

会计工作的关键一步——用免费可视化工具制作财务报表

会计工作中,关键一步就是把那些繁杂的财务数据整理成清晰易懂的财务报表,这就像是把一堆拼图块变成一幅完整的图画。山海鲸可视化这款免费工具,支持实时数据刷新,能够随时随地更新你的财务数据,确保你拿到的永远是最新鲜的“出炉”数据。操作也非常简单,零代码拖拽式界面…

JVM是如何创建一个对象的?

1. Java对象创建的流程是什么样? 2. JVM执行new关键字时都有哪些操作? 3. JVM在频繁创建对象时,如何保证线程安全? 4. Java对象的内存布局是什么样的? 5. 对象头都存储哪些数据?哈喽,大家好🎉,我是世杰。 本文我为大家介绍面试官经常考察的「Java对象创建流程」照例在…

Kubernetes-Master 基准测试

背景 Kubernetes是容器集群管理系统,为容器化的应用提供资源调度、部署运行、滚动升级、扩容缩容等功能。容器集群管理给业务带来了便利,但是随着业务的不断增长,应用数量可能会发生爆发式的增长。那在这种情况下,Kubernetes能否快速地完成扩容、扩容到大规模时Kubernetes管…

硬核案例分享,一文带你拆解PHP语言体系下的容器化改造

本文介绍了PHP语言体系应用现代化案例,实现了许多与业务无关的通用性应用改造方案,如PHP应用容器化架构方案、基于Prometheus的弹性伸缩方案等等,为此类型客户提供了一个可参考的案例。本文分享自华为云社区《PHP语言体系下的容器化改造,助力夺冠集团应用现代化》,作者: …

CoreDNS 概述及运维实践

概述 什么是 DNS ? 域名系统(英语:Domain Name System,缩写:DNS)是互联网的一项服务。它作为将域名和IP地址相互映射的一个分布式数据库,能够使人更方便地访问互联网。DNS使用TCP和UDP端口53。 DNS 不仅方便了人们访问不同的互联网服务,更为很多应用提供了,动态服务发…

从0开始装一套 KubeVirt 1.2.1

KubeVirt 架构 架构virt-api : 负责提供一些 KubeVirt 特有的 api,像是 console, vnc, startvm, stopvm 等。 virt-controller : 管理和监控 VMI 对象及其关联的 Pod,对其状态进行更新。 virt-hander : 以 DaemonSet 运行在每一个节点上,监听 VMI 的状态向上汇报,管理 VMI …