原文链接:https://blog.csdn.net/qq_40344790/article/details/130865273
ZMQ介绍
官网:https://zeromq.org/
Github:https://github.com/zeromq/libzmq
ZMQ(ZeroMQ)是一种高性能的异步消息传递库,它可以在不同的进程和机器之间进行消息传递。它提供了多种传输协议、通信模式和编程语言支持,并且非常易于使用。
ZMQ 的核心思想是将网络通信抽象出来成为 socket 概念,使用不同类型的 socket 可以实现不同的消息传递模式,例如请求-应答模式、发布-订阅模式、推送-拉取模式等。ZMQ 提供了 TCP、IPC、inproc 等多种传输协议,可以根据需要选择合适的协议。
常用的ZeroMQ URL格式:
使用TCP协议 TCP: "tcp://<address>:<port>"
进程内通信 in-process: "inproc://<name>"
进程间通信 (Windows系统) inter-process: "ipc://<path>" (Unix系统) 或 "ipc://<name>"
多播: (使用PGM协议) "epgm://<address>:<port>"
多播: (使用UDP协议) "epub://<address>:<port>"
消息模式及示例
请求-应答模式
server.cpp
1 #include <zmq.hpp> 2 #include <iostream> 3 4 int main() { 5 // 创建上下文和套接字 6 zmq::context_t context(1); 7 zmq::socket_t socket(context, zmq::socket_type::rep); 8 9 // 绑定到指定地址 10 socket.bind("tcp://*:5555"); 11 12 while (true) { 13 // 接收请求 14 zmq::message_t request; 15 socket.recv(request, zmq::recv_flags::none); 16 17 // 打印请求内容 18 std::cout << "Received request: " << std::string(static_cast<char*>(request.data()), request.size()) << std::endl; 19 20 // 发送响应 21 zmq::message_t response(5); 22 memcpy(response.data(), "World", 5); 23 socket.send(response, zmq::send_flags::none); 24 } 25 26 return 0; 27 }
client.cpp
1 #include <zmq.hpp> 2 #include <iostream> 3 4 int main() { 5 // 创建上下文和套接字 6 zmq::context_t context(1); 7 zmq::socket_t socket(context, zmq::socket_type::req); 8 9 // 连接到服务端地址 10 socket.connect("tcp://localhost:5555"); 11 12 // 发送请求 13 zmq::message_t request(5); 14 memcpy(request.data(), "Hello", 5); 15 socket.send(request, zmq::send_flags::none); 16 17 // 接收响应 18 zmq::message_t response; 19 socket.recv(response, zmq::recv_flags::none); 20 21 // 打印响应 22 std::cout << "Received response: " << std::string(static_cast<char*>(response.data()), response.size()) << std::endl; 23 24 return 0; 25 }
发布-订阅模式
publisher.cpp
1 #include <zmq.hpp> 2 #include <string> 3 #include <iostream> 4 #include <unistd.h> 5 6 int main() { 7 // 创建上下文和套接字 8 zmq::context_t context(1); 9 zmq::socket_t socket(context, zmq::socket_type::pub); 10 11 // 绑定到指定地址 12 socket.bind("tcp://*:5555"); 13 14 // 发布消息 15 int count = 0; 16 while (true) { 17 std::string topic = "Topic"; 18 std::string message = "Message " + std::to_string(count); 19 20 // 发布主题和消息 21 zmq::message_t topicMsg(topic.size()); 22 memcpy(topicMsg.data(), topic.data(), topic.size()); 23 socket.send(topicMsg, zmq::send_flags::sndmore); 24 25 zmq::message_t messageMsg(message.size()); 26 memcpy(messageMsg.data(), message.data(), message.size()); 27 socket.send(messageMsg, zmq::send_flags::none); 28 29 std::cout << "Published: " << topic << " - " << message << std::endl; 30 31 count++; 32 sleep(1); // 每秒发布一条消息 33 } 34 35 return 0; 36 }
subscriber.cpp
1 #include <zmq.hpp> 2 #include <string> 3 #include <iostream> 4 5 int main() { 6 // 创建上下文和套接字 7 zmq::context_t context(1); 8 zmq::socket_t socket(context, zmq::socket_type::sub); 9 10 // 连接到发布者地址 11 socket.connect("tcp://localhost:5555"); 12 13 // 订阅所有主题 14 socket.setsockopt(ZMQ_SUBSCRIBE, "", 0); 15 16 // 接收并处理消息 17 while (true) { 18 // 接收主题 19 zmq::message_t topicMsg; 20 socket.recv(topicMsg, zmq::recv_flags::none); 21 std::string topic(static_cast<char*>(topicMsg.data()), topicMsg.size()); 22 23 // 接收消息 24 zmq::message_t messageMsg; 25 socket.recv(messageMsg, zmq::recv_flags::none); 26 std::string message(static_cast<char*>(messageMsg.data()), messageMsg.size()); 27 28 std::cout << "Received: " << topic << " - " << message << std::endl; 29 } 30 31 return 0; 32 }
推送-拉取模式
pusher.cpp
1 #include <zmq.hpp> 2 #include <string> 3 #include <iostream> 4 #include <unistd.h> 5 6 int main() { 7 // 创建上下文和套接字 8 zmq::context_t context(1); 9 zmq::socket_t socket(context, zmq::socket_type::push); 10 11 // 绑定到指定地址 12 socket.bind("tcp://*:5555"); 13 14 // 发送消息 15 int count = 0; 16 while (true) { 17 std::string message = "Message " + std::to_string(count); 18 19 // 发送消息 20 zmq::message_t messageMsg(message.size()); 21 memcpy(messageMsg.data(), message.data(), message.size()); 22 socket.send(messageMsg, zmq::send_flags::none); 23 24 std::cout << "Pushed: " << message << std::endl; 25 26 count++; 27 sleep(1); // 每秒推送一条消息 28 } 29 30 return 0; 31 }
puller.cpp
1 #include <zmq.hpp> 2 #include <string> 3 #include <iostream> 4 5 int main() { 6 // 创建上下文和套接字 7 zmq::context_t context(1); 8 zmq::socket_t socket(context, zmq::socket_type::pull); 9 10 // 连接到推送者地址 11 socket.connect("tcp://localhost:5555"); 12 13 // 接收消息 14 while (true) { 15 zmq::message_t messageMsg; 16 socket.recv(messageMsg, zmq::recv_flags::none); 17 std::string message(static_cast<char*>(messageMsg.data()), messageMsg.size()); 18 19 std::cout << "Pulled: " << message << std::endl; 20 } 21 22 return 0; 23 }
进程间通信示例
sender.cpp
1 #include <zmq.hpp> 2 #include <string> 3 #include <iostream> 4 5 int main() { 6 // 创建上下文和套接字 7 zmq::context_t context(1); 8 zmq::socket_t socket(context, zmq::socket_type::push); 9 10 // 连接到接收者的地址 11 socket.connect("ipc:///tmp/zmq_ipc_example"); 12 13 // 发送消息 14 std::string message = "Hello from sender!"; 15 zmq::message_t messageMsg(message.size()); 16 memcpy(messageMsg.data(), message.data(), message.size()); 17 socket.send(messageMsg, zmq::send_flags::none); 18 19 return 0; 20 }
receiver.cpp
1 #include <zmq.hpp> 2 #include <string> 3 #include <iostream> 4 5 int main() { 6 // 创建上下文和套接字 7 zmq::context_t context(1); 8 zmq::socket_t socket(context, zmq::socket_type::pull); 9 10 // 绑定到指定地址 11 socket.bind("ipc:///tmp/zmq_ipc_example"); 12 13 // 接收消息 14 zmq::message_t messageMsg; 15 socket.recv(messageMsg, zmq::recv_flags::none); 16 std::string message(static_cast<char*>(messageMsg.data()), messageMsg.size()); 17 18 std::cout << "Received message: " << message << std::endl; 19 20 return 0; 21 }
Router-Dealer消息路由
Router 模式是 ZeroMQ 中的一种复杂通信模式,用于创建灵活的消息路由系统。在 Router 模式下,ROUTER套接字可以接收来自多个客户端的请求,并将这些请求分发给多个工作线程或服务DEALER套接字。
Router-Dealer 通信模式可以用于实现负载均衡、消息路由和复杂的请求-响应模式,非常适合需要多个客户端和多个服务端进行交互的场景。
server.cpp
1 #include <zmq.hpp> 2 #include <iostream> 3 #include <string> 4 5 int main() { 6 zmq::context_t context(1); 7 zmq::socket_t router(context, ZMQ_ROUTER); 8 9 // 绑定到端口 5555 10 router.bind("tcp://*:5555"); 11 12 while (true) { 13 zmq::message_t identity; 14 zmq::message_t message; 15 16 // 接收来自 Dealer 的身份和消息 17 router.recv(&identity); 18 router.recv(&message); 19 20 std::string identity_str = std::string(static_cast<char*>(identity.data()), identity.size()); 21 std::string message_str = std::string(static_cast<char*>(message.data()), message.size()); 22 23 std::cout << "Received message from " << identity_str << ": " << message_str << std::endl; 24 25 // 回复消息给 Dealer 26 std::string reply_message = "Hello from Router"; 27 zmq::message_t reply(reply_message.size()); 28 memcpy(reply.data(), reply_message.c_str(), reply_message.size()); 29 30 router.send(identity, ZMQ_SNDMORE); 31 router.send(reply); 32 } 33 34 return 0; 35 }
client.cpp
1 // g++ -o client client.cpp -lzmq -lpthread 2 #include <zmq.hpp> 3 #include <iostream> 4 #include <string> 5 #include <thread> 6 7 void client_task() { 8 zmq::context_t context(1); 9 zmq::socket_t dealer(context, ZMQ_DEALER); 10 11 // 连接到 Router 12 dealer.connect("tcp://localhost:5555"); 13 14 std::string identity = "Client1"; 15 16 // 设置 Dealer 的身份 17 dealer.setsockopt(ZMQ_IDENTITY, identity.c_str(), identity.size()); 18 19 // 发送消息给 Router 20 std::string request_message = "Hello from Dealer"; 21 zmq::message_t request(request_message.size()); 22 memcpy(request.data(), request_message.c_str(), request_message.size()); 23 24 dealer.send(request); 25 26 // 接收 Router 的回复 27 zmq::message_t reply; 28 dealer.recv(&reply); 29 30 std::string reply_str = std::string(static_cast<char*>(reply.data()), reply.size()); 31 32 std::cout << "Received reply from Router: " << reply_str << std::endl; 33 } 34 35 int main() { 36 std::thread client_thread(client_task); 37 client_thread.join(); 38 39 return 0; 40 }