文章标题 原创 翻译 转载 文章内容 pub sub inproc进程内通信 # 简介 进程内传输方式意味着在共享ZMQ context的线程间通过内存方式传输数据。 通讯地址必须保证已经被相同context上的一个socket创建了。 文件名必须是在与这个socket关联的ZMQ context上是唯一的,而且长度不能大于256字节。 # 示例说明 启动三个线程,一个推送线程,两个订阅线程。 推送线程1:推送A,B,C三种消息; 订阅线程2:订阅A,B消息; 订阅线程3:订阅所有消息; 所以,线程2只接受A,B消息,线程3接收A,B,C消息。 # 代码 ``` // reqclient.cpp : 此文件包含 "main" 函数。程序执行将在此处开始并结束。 // #include <string> #include <iostream> #include <algorithm> #include <thread> #include <mutex> #include "zmq.hpp" #include "zmq_addon.hpp" static std::mutex s_mutex; void PublisherThread(zmq::context_t *ctx) { // Prepare publisher zmq::socket_t publisher(*ctx, zmq::socket_type::pub); publisher.bind("inproc://#1"); // Give the subscribers a chance to connect, so they don't lose any messages std::this_thread::sleep_for(std::chrono::milliseconds(20)); while (true) { // Write three messages, each with an envelope and content publisher.send(zmq::str_buffer("A"), zmq::send_flags::sndmore); publisher.send(zmq::str_buffer("Message in A envelope")); publisher.send(zmq::str_buffer("B"), zmq::send_flags::sndmore); publisher.send(zmq::str_buffer("Message in B envelope")); publisher.send(zmq::str_buffer("C"), zmq::send_flags::sndmore); publisher.send(zmq::str_buffer("Message in C envelope")); std::this_thread::sleep_for(std::chrono::milliseconds(1000)); } } void SubscriberThread1(zmq::context_t *ctx) { // Prepare subscriber zmq::socket_t subscriber(*ctx, zmq::socket_type::sub); subscriber.connect("inproc://#1"); // Thread2 opens "A" and "B" envelopes subscriber.set(zmq::sockopt::subscribe, "A"); subscriber.set(zmq::sockopt::subscribe, "B"); while (1) { // Receive all parts of the message std::vector<zmq::message_t> recv_msgs; zmq::recv_result_t result = zmq::recv_multipart(subscriber, std::back_inserter(recv_msgs)); assert(result && "recv failed"); assert(*result == 2); std::lock_guard<std::mutex> lock(s_mutex); std::cout << "Thread2: [" << recv_msgs[0].to_string() << "] " << recv_msgs[1].to_string() << std::endl; } } void SubscriberThread2(zmq::context_t *ctx) { // Prepare our context and subscriber zmq::socket_t subscriber(*ctx, zmq::socket_type::sub); subscriber.connect("inproc://#1"); // Thread3 opens ALL envelopes subscriber.set(zmq::sockopt::subscribe, ""); while (1) { // Receive all parts of the message std::vector<zmq::message_t> recv_msgs; zmq::recv_result_t result = zmq::recv_multipart(subscriber, std::back_inserter(recv_msgs)); assert(result && "recv failed"); assert(*result == 2); std::lock_guard<std::mutex> lock(s_mutex); std::cout << "Thread3: [" << recv_msgs[0].to_string() << "] " << recv_msgs[1].to_string() << std::endl; } } int main() { zmq::context_t ctx(0); auto thread1 = std::thread(PublisherThread, &ctx); // Give the publisher a chance to bind, since inproc requires it std::this_thread::sleep_for(std::chrono::milliseconds(10)); auto thread2 = std::thread(SubscriberThread1, &ctx); auto thread3 = std::thread(SubscriberThread2, &ctx); thread1.join(); thread2.join(); thread3.join(); /* * Output: * An infinite loop of a mix of: * Thread2: [A] Message in A envelope * Thread2: [B] Message in B envelope * Thread3: [A] Message in A envelope * Thread3: [B] Message in B envelope * Thread3: [C] Message in C envelope */ } ``` 文章类别 Python Mobile Android Java Shell Life Database Bug Windows IOS Tools Boost Node.js Mac Product Tips C/C++ Golang Javascript React Qt MQ MongoDB Design Web Linux LLM ChatGPT RAG AI 提交