文章标题 原创 翻译 转载 文章内容 方了方便客户端服务端网络部分的开发,使用boost asio和protobuf封装了一个通信库。 # 特点 - 接口简单 - 高性能 - 自动重连,客户端支持多IP寻址 - 支持心跳 - 包含glog日志库可以直接使用 - 允许设置压缩包 - 库自定义通信协议,用户只需要使用protobuf定义数据结构就可以了 - 服务端多线程处理消息支持有序和无序应答 - 支持请求应答和订阅推送两种模式 - 支持同步请求异步请求 - 线程安全 - 支持跨平台,测试环境编译(windows,linux) # 请求应答代码演示 客户端: ``` #include <iostream> #include "easyio/proto/pb_simple.pb.h" #include "easyio/msgclient.h" #include "easyio/msgbus.h" #include "easyio/tool/util.h" #include "easyio/tool/strutil.h" using namespace easyio; const std::string ADDRESS = "127.0.0.1:5577"; int main(int argc, char* argv[]) { srand((unsigned int)time(NULL)); setting::initlog(argv[0], true, 1); setting::initEnableCompressSize(1024); LOG(INFO) << "client start..."; std::vector<std::string> addressList{ ADDRESS }; MsgClient client(addressList, 0); client.start(); client.waitConnected(); auto reqMsg = std::make_shared<ProtoSimple::PingPongReq>(); reqMsg->set_content(strutil::random_string(100)); std::cout << "send:" << reqMsg->content() << std::endl; client.postMessage(reqMsg, [](int error, const PackagePtr &req, const PackagePtr &rsp) { if (error == 0) { auto rspMsg = static_cast<ProtoSimple::PingPongRsp*>(rsp->msg()); const std::string &rspstr = rspMsg->content(); std::cout << "response:" << rspstr << std::endl; } }); util::sleep(1000); std::cout << "exit" << std::endl; return 0; } ``` 发送异步请求,这是一个PingPong消息,服务端将请求的数据原样返回。请求和应答在同一个代码块,应答使用lambda表达式来处理。 主要步骤是先要定义ProtoSimple::PingPongReq和ProtoSimple::PingPongRsp这两个protobuf结构,然后将req发送出去,就可以获得rsp了,必须要配对,有请求就应该有应答,即使应答的内容为空。 服务端: ``` #include <iostream> #include <easyio/proto/pb_simple.pb.h> #include <easyio/msgclient.h> #include <easyio/msgserver.h> #include <easyio/msgbus.h> #include <easyio/tool/util.h> using namespace easyio; int main(int argc, char* argv[]) { setting::initlog(argv[0], true); setting::initEnableCompressSize(); LOG(INFO) << "server start..."; MsgServer server(5577); server.addHandleMessage(TYPE_NAME(ProtoSimple::PingPongReq), [](const SessionPtr &session, const PackagePtr &request) { auto req = static_cast<ProtoSimple::PingPongReq*>(request->msgPtr.get()); auto rsp = std::make_shared<ProtoSimple::PingPongRsp>(); rsp->set_content(req->content()); session->replyMessage(request, rsp); }); server.run(); LOG(INFO) << "server exit!!!"; return 0; } ``` 服务端看起来更简单,增加PingPongReq的消息处理,获取请求的数据将其赋值给应答,然后通过session调用replyMessage。 # 推送订阅代码演示 订阅方: ``` #include <iostream> #include "easyio/proto/pb_simple.pb.h" #include "easyio/msgclient.h" #include "easyio/msgbus.h" #include "easyio/tool/util.h" #include "easyio/tool/strutil.h" using namespace easyio; const std::string ADDRESS = "127.0.0.1:5577"; int main(int argc, char* argv[]) { setting::initlog(argv[0], true, 1); setting::initEnableCompressSize(1024); LOG(INFO) << "client start..."; std::vector<std::string> addressList{ ADDRESS }; MsgClient client(addressList, 0); client.addHandlePublish(TYPE_NAME(ProtoSimple::ServerInfoPub), [](int error, const MessagePtr &pubMsgPtr) { if (error == 0) { auto info = static_cast<ProtoSimple::ServerInfoPub*>(pubMsgPtr.get()); std::cout << "pub:" << info->hello() << std::endl; } }); client.start(); client.waitConnected(); util::sleep(10000); std::cout << "exit" << std::endl; return 0; } ``` 使用addHandlePublish处理订阅的消息,就相当于你订阅了这个消息,在client start之后就会自动向服务端发送订阅的消息。 推送方: ``` #include <iostream> #include <easyio/proto/pb_simple.pb.h> #include <easyio/msgclient.h> #include <easyio/msgserver.h> #include <easyio/msgbus.h> #include <easyio/tool/util.h> using namespace easyio; int main(int argc, char* argv[]) { setting::initlog(argv[0], true); setting::initEnableCompressSize(); LOG(INFO) << "server start..."; MsgServer server(5577); std::thread t([&server]() { while (1) { auto info = std::make_shared<ProtoSimple::ServerInfoPub>(); std::string strtime = util::getFormatTime(util::currentMillisecond()); std::cout << "pub time:" << strtime << std::endl; info->set_hello(strtime); server.publishMessage(info); util::sleep(1000); } }); t.detach(); server.run(); LOG(INFO) << "server exit!!!"; return 0; } ``` 推送方每秒推送当前时间给订阅方。 注意:server run之后才能进行推送,而run是一个io serivce会阻塞当前线程,所以这里是单独开一个线程来循环推送消息。 # 项目结构介绍 我给这个库取了个名字:easyio ``` easyio internal // 核心部分封装了通信逻辑,协议,线程池,任务管理 proto // 内部protobuf协议定义,订阅、心跳协议等 tool // 工具类 msgclient.h // 客户端调用的外部接口类 msgserver.h // 服务端调用的外部接口类 ``` # 自定义协议 包有包头和包体两部分组合,如下: ``` #pragma pack(push,1) struct PacHeader { static const char flag0; static const char flag1; enum MsgType { REQREP = 1, PUBSUB, DELIVER, }; char flag[2]{flag0, flag1}; short msgType{ 0 }; short typeNameLen{ 0 }; union { short extInfo{ 0 }; struct { unsigned char iszip : 1; unsigned char isorder : 1; }; }; int msgId{ 0 }; int msgSize{ 0 }; int pacSize{ 0 }; }; #pragma pack(pop) const int kPackageHeaderSize = sizeof(PacHeader); const int kMsTimeout = 5000; struct Package { PacHeader header; std::string typeName; #ifdef MSGBUS std::vector<char> body; #endif MessagePtr msgPtr; inline google::protobuf::Message* msg() { return msgPtr.get(); } }; ``` ## 包头 包头PacHeader固定大小20个字节,字节分别是: 1:flag0,固定标识(用来分隔不同的包) 2: flag1,固定标识 3~4:msgType,消息类型MsgType 5~6:typeNameLen,protobuf名长度 7~8:extInfo,扩展信息,包括iszip是否压缩,isorder是否是order message 9~12:msgId,消息ID 13~16:msgSize,消息大小 17~20:pacSize,包的大小 ## 包体 包括:typename + msgbody typename:protobuf的名字 msgbody:protobuf的二进制内容 ## 包的编解码 编码将消包(包头+包体)转换为二进制流存入buffer类。 解码将网络过来的二进制流转换为包结构 需要注意的是,由于包头格式是固定的解码没什么问题,但是包体是各种数据结构,要一个一个的解析太麻烦了,而且不好扩展,这里就体现protobuf的优势来了。 protobuf支持反射可以根据protobuf类名创建出protobuf对象,然后调用protobuf对象的方法名传入二进制流参数进行初始化,方法如下: ``` google::protobuf::Message* ProtoHelp::createMessage(const std::string & typeName) { google::protobuf::Message *message = NULL; const google::protobuf::Descriptor* desc = google::protobuf::DescriptorPool::generated_pool()->FindMessageTypeByName(typeName); if (desc) { const google::protobuf::Message *prototype = google::protobuf::MessageFactory::generated_factory()->GetPrototype(desc); if (prototype) { message = prototype->New(); } } return message; } google::protobuf::Message *msg = createMessage(typeName); parseResult = msg->ParseFromArray("二进制流"); ``` 编码代码: ``` bool ProtoHelp::encode(const PackagePtr &package, BufferPtr &buffer) { if (package->typeName.empty()) { return false; } std::string content; content.resize(package->msgPtr->ByteSizeLong()); if (package->msgPtr->SerializePartialToArray(&content[0], content.size())) { // need compress if (ProtoHelp::enableCompressSize() >= 0 && (int)content.size() > ProtoHelp::enableCompressSize()) { unsigned long bufferSize = compressBound(content.size()); std::string compressedBuffer(bufferSize, 0); int errcode = compress((unsigned char*)&compressedBuffer[0], &bufferSize, (unsigned char*)&content[0], content.size()); if (errcode == Z_OK) { LOG(INFO) << "compress success from:" << content.size() << ", to:" << bufferSize; package->header.iszip = 1; content.assign(&compressedBuffer[0], bufferSize); } else { LOG(ERROR) << "compress error:" << errcode << ", size:" << content.size(); } } package->typeName = package->msgPtr->GetTypeName(); package->header.typeNameLen = (short)package->typeName.size(); package->header.msgSize = package->msgPtr->ByteSizeLong(); package->header.pacSize = kPackageHeaderSize + package->typeName.size() + content.size(); bufferAppendHeader(*buffer.get(), package->header); buffer->append(&package->typeName[0], package->header.typeNameLen); buffer->append(content); return true; } return false; } ``` 解码代码: ``` PackagePtr ProtoHelp::decode(Buffer &buf) { if (!gotoFlag(buf)) { return PackagePtr(); } PacHeader header; bufferPeekHeader(buf, header); // 数据不够等待下次读取更多的数据 if (buf.readableBytes() < (std::size_t)header.pacSize) { return PackagePtr(); } buf.retrieve(kPackageHeaderSize); if (header.pacSize < 0 || header.pacSize > kMaxPackageLen) { LOG(INFO) << "pac size error:" << header.pacSize; return decode(buf); } if (header.typeNameLen <= 0 || header.typeNameLen > 1024) { LOG(INFO) << "type name len error:" << header.typeNameLen; return decode(buf); } std::string typeName = buf.retrieveAsString(header.typeNameLen); google::protobuf::Message *msg = createMessage(typeName); if (!msg) { LOG(ERROR) << "create message failed:" << typeName; return decode(buf); } bool parseResult = false; int remainSize = header.pacSize - kPackageHeaderSize - header.typeNameLen; if (header.iszip) { unsigned long bufferSize = header.msgSize; std::string unCompressedBuffer(bufferSize, 0); int errcode = uncompress((unsigned char*)&unCompressedBuffer[0], &bufferSize, (const unsigned char*)buf.peek(), remainSize); if (errcode == Z_OK) { LOG(INFO) << "uncompress success, from:" << remainSize << ", to:" << unCompressedBuffer.size(); header.iszip = false; header.pacSize = kPackageHeaderSize + header.typeNameLen + header.msgSize; parseResult = msg->ParseFromArray(&unCompressedBuffer[0], bufferSize); } else { LOG(ERROR) << "uncompress error:" << errcode << ", size:" << bufferSize; } } else { parseResult = msg->ParseFromArray(buf.peek(), header.msgSize); } if (parseResult) { buf.retrieve(remainSize); auto result = std::make_shared<Package>(); result->header = header; result->typeName = typeName; result->msgPtr = MessagePtr(msg); return result; } LOG(ERROR) << "decode failed:" << typeName; return PackagePtr();; } ``` # client封装 客户端只需要提供: * 发送消息(同步、异步还有一个顺序消息),处理应答接口 * 设置连接成功后的回调 * 设置订阅推送消息的处理 ``` namespace easyio { class PublishHandler; class MsgClient { public: explicit MsgClient(const std::vector<std::string> &addressList, int heartbeatSeconds = 5); ~MsgClient(); MsgClient(const MsgClient&) = delete; MsgClient& operator=(const MsgClient&) = delete; void setConnectNotify(const std::function<void(bool)> &cb); void addHandlePublish(const std::string &typeName, const PublishFunc &func); void start(); void stop(); bool waitConnected(int timeoutSecond = 3); int sendMessage(const MessagePtr &msgPtr, MessagePtr &rspPtr, int msTimeout = kMsTimeout); int postMessage(const MessagePtr &msgPtr, const Response &res, int msTimeout = kMsTimeout); int postOrderMessage(const MessagePtr &msgPtr, const Response &res, int msTimeout = kMsTimeout); private: std::shared_ptr<PublishHandler> publishHandler_; D_PRIVATE(AsioClient); }; } ``` # 服务端封装 服务端需要处理: * 处理请求应答 * 提供推送消息的接口 ``` class MsgServer { public: explicit MsgServer(unsigned short port); explicit MsgServer(unsigned short port, unsigned int workSize, unsigned int poolSize); ~MsgServer(); MsgServer(const MsgServer&) = delete; MsgServer& operator=(const MsgServer&) = delete; void run(); void stop(); void addMethodHanlder(const MsgHandlerPtr &handler); void addHandleMessage(const std::string &protoName, const Task &task); void publishMessage(const MessagePtr &msg); private: std::vector<MsgHandlerPtr> handlers_; D_PRIVATE(AsioServer) }; ``` # session管理 每个客户端与服务端建立的连接我们就认为是一个session,用SessionRoom来管理session的新增和删除以及订阅推送消息。 ``` class Session : public Publisher, public std::enable_shared_from_this<Session> { public: explicit Session(tcp::socket socket, SessionRoom &room, TaskManager &taskManager); Session(const Session&) = delete; Session& operator=(const Session&) = delete; ~Session(); void start(); void replyMessage(const PackagePtr &req, const MessagePtr &rspMsg); std::string remoteEndpoint() const; void publishMessage(const MessagePtr &msg) override; void deliverPackage(const PackagePtr &pac) override; int id() const override; void doRead(); void doWrite(boost::system::error_code ec, std::size_t length); void writeBuffer(BufferPtr buffer); void postPackage(const PackagePtr &pack); private: std::string endpoint_; std::unique_ptr<SessionData> d; SessionRoom &room_; TaskManager &taskManager_; }; ``` # 任务管理 对于服务端而言,它面对的是成千上万的客户端连接以及大量的消息,既然有消息过来那就是要做事情,也就是有任务了,要想高效的处理就需要多线程。 对于底层的连接以及消息收发的性能有boost的asio来保证,这里不用我们来操心。 我们需要关注的业务处理的性能,也就是任务管理了 ``` namespace easyio { class ThreadPool; class Worker; typedef std::shared_ptr<Worker> WorkerPtr; typedef std::shared_ptr<ThreadPool> ThreadPoolPtr; class TaskManager { public: class TaskManagerPrivate { public: TaskManagerPrivate(unsigned int workSize, unsigned int poolSize); void init(unsigned int workSize, unsigned int poolSize); ThreadPoolPtr pool_; std::map<std::string, Task> handler_; std::vector<WorkerPtr> workers_; }; TaskManager(unsigned int workSize, unsigned int poolSize); TaskManager(TaskManager&) = delete; TaskManager& operator=(TaskManager&) = delete; void addHandleTask(const std::string &protoName, const Task &task); void setPreHandleTask(const Task &task); void handleMessage(const PackagePtr &msgPtr, const SessionPtr &sessionPtr); void destory(); private: Task preTask_{ nullptr }; D_PRIVATE(TaskManagerPrivate) }; } ``` 任务管理用到了线程池,多线程来处理任务,虽然很高效但是会带来一个问题,就是处理消息的顺序和应答的顺序并不是顺序的。 比如请求的顺序是:A,B,C 应答的顺序可能是:C,B,A 因为你只有处理完了消息才能应答,任务处理花费的时间越长应答越慢。 当然,这里说的是同一个session,不同session请求消息的顺序这里不考虑。 为了解决这个问题,对于要保证顺序的消息,我们根据session id把它路由到相同的线程处理,缺点是如果某个任务花费的时间较长会影响后续任务的执行,所以对于order message要尽量少用。 # Buffer二进制流处理 Buffer类可以很方便的处理二进制流,自动扩容,基本类型的读写,读写指定大小的字节数。 要注意网络字节序与主机字节序的转换 # 日志 日志打印使用glog库 # 包压缩 使用zlib库解压缩,当包的大小超过设置的值时。太小的包不值得压缩。 # msgbus 项目里有一些msgbus相关的代码,是用来实验消息代理转发的,这里可以不考虑。 # 源码地址 [https://gitee.com/tujiaw/easyio.git](https://gitee.com/tujiaw/easyio.git) 有兴趣的Star一下吧。 文章类别 Python Mobile Android Java Shell Life Database Bug Windows IOS Tools Boost Node.js Mac Product Tips C/C++ Golang Javascript React Qt MQ MongoDB Design Web Linux LLM ChatGPT RAG AI 提交