文章标题 原创 翻译 转载 文章内容 ![](https://ningto.com/upload/20210526-ntscreenshot-163324.png) 如图所示:一个工作派发器(dispatcher),多个工作者(worker),一个结果收集器(collector)。 dispatcher不断的派发工作给多个worker处理,worker将处理完成的结果发送给collector,collector来统计结果。 注意点: * bind和connect不会阻塞; * send和recv会阻塞; * 只有send或者只有recv一方时会阻塞; * 如果send和recv是配对的,暂时不会阻塞,发送和接收都有个缓冲队列,如果缓冲队列满了还是会阻塞; * 缓冲队列设置,setsockopt(zmq.SNDHWM, 2000),默认是1000个消息; * 当send和recv速度不匹配时,速度快的一方的缓冲队列满了就会阻塞,缓冲队列被另一方消费后就又恢复畅通了; 代码: dispatcher ``` #include "zmq.hpp" #include <string> #include <iostream> #include <algorithm> #include <random> #include <thread> int main() { std::default_random_engine engine; std::uniform_int_distribution<int> u(0, 100); try { zmq::context_t context(1); zmq::socket_t sender(context, zmq::socket_type::push); sender.bind("tcp://127.0.0.1:5557"); zmq::socket_t signaler(context, zmq::socket_type::push); signaler.connect("tcp://127.0.0.1:5558"); const int TASK_COUNT = 1000; std::string strCount = std::to_string(TASK_COUNT); signaler.send(zmq::const_buffer(strCount.c_str(), strCount.size())); std::cout << "event count:" << strCount << std::endl; for (int i = 0; i < TASK_COUNT; i++) { int n = u(engine); std::cout << "event i:" << i << ", data:" << n << std::endl; std::string str = std::to_string(n); zmq::const_buffer msg(str.c_str(), str.size()); sender.send(msg); } } catch (zmq::error_t &e) { std::cerr << "exception:" << e.what() << std::endl; } zmq_sleep(1); system("pause"); return 0; } ``` worker ``` #include <iostream> #include <zmq.hpp> #include <thread> int main() { try { zmq::context_t context(1); zmq::socket_t receiver(context, zmq::socket_type::pull); receiver.connect("tcp://127.0.0.1:5557"); zmq::socket_t sender(context, zmq::socket_type::push); sender.connect("tcp://127.0.0.1:5558"); int handleCount = 0; while (1) { zmq::message_t msg; receiver.recv(msg); std::string str = msg.to_string(); std::cout << "sleep millisecond:" << str << std::endl; std::this_thread::sleep_for(std::chrono::milliseconds(atoi(str.c_str()))); sender.send(zmq::const_buffer(str.c_str(), str.size())); ++handleCount; std::cout << "handle count:" << handleCount << std::endl; } } catch (zmq::error_t &e) { std::cout << e.what() << std::endl; } system("pause"); return 0; } ``` collector ``` int main() { try { zmq::context_t context(1); zmq::socket_t receiver(context, zmq::socket_type::pull); receiver.bind("tcp://127.0.0.1:5558"); int total = 0; int i = 0; while (1) { zmq::message_t buffer; receiver.recv(buffer); std::string str = buffer.to_string(); total += atoi(str.c_str()); std::cout << "i:" << i++ << ", msg:" << buffer.to_string() << ", total:" << total << std::endl; } } catch (zmq::error_t &e) { std::cout << e.what() << std::endl; } system("pause"); return 0; } ``` 文章类别 Python Mobile Android Java Shell Life Database Bug Windows IOS Tools Boost Node.js Mac Product Tips C/C++ Golang Javascript React Qt MQ MongoDB Design Web Linux LLM ChatGPT RAG AI 提交