删除文章

确定要删除这篇文章吗?

取消
确定

多线程锁key处理

     阅读(53)  2020-08-04 09:24:22

目的是要保证多个线程处理同一个key的消息时是线程安全的,同时是顺序的。

  1. 收到网络过来的消息,将消息缓存到total msg cache里
  2. 解析业务消息找到每条数据的key(可能一条或多条)存储在set中
  3. 将所有key进行hash,对最大线程数进行取余,获取所有线程索引std::set indexList
  4. 如果消息没有index即没有key,就轮询分发给otherThread处理
  5. 有index的消息,根据index分发给线程,如果一个消息有多个index,对应的一个消息会分发给多个线程(keyThread)
  6. 每个keyThread不断的从线程队列里取出消息ID来处理(线程队列只保留消息ID)
  7. 根据消息ID到total msg cache里去查找消息
  8. 如果没有找到,说明已经被其他线程消费掉了(因为一个消息有多个key的话会分配给多个thread)
  9. 如果找到了,递减消息的索引数,索引数大于0就等待,索引数为0就处理消息然后从total msg cache中删除消息(一个消息只能被处理一次),最后通知其他线程
  10. 其他线程收到通知后再次去cache查找消息重复步骤7,直到消息被处理掉
  11. 最后每个线程不断重复从线程队列取消息

关键点: 如果一个消息被分配给了n个线程那它的索引数就是n,为了保证有相同key的消息的处理顺序,当找到了索引数大于0的消息时当前线程必须等待,要保证其他线程中当前消息之前的消息都被处理完也就是说索引递减为0了才能处理。

如果一个消息只有一个key,对key进行hash将消息分配到指定的线程就能保证相同的key不会被多个线程处理,同时是顺序的

如果一个消息有多个key,递减索引数以及等待索引数为0,能保证消息的顺序执行

keysync 代码:

// .h
typedef std::shared_ptr<sdbus::Message> MessagePtr;

class Service;
// 消息简单包装
class MessagePack {
public:
    MessagePack();
    MessagePack(Service *service, const MessagePtr &msg, bool ignoreLog, const std::set<int> &indexs);
    MessagePack(const MessagePack&);
    MessagePack& operator=(const MessagePack&);

    bool ignoreLog() const;
    const std::set<int>& indexList() const;
    bool isMultiIndex() const;
    int indexCountSub();
    int indexCount() const;
    MessagePtr msg() const;
    const std::string& id() const;
    void handle();
    time_t msStart() const;

private:
    bool ignoreLog_;
    MessagePtr msg_;
    std::set<int> indexs_;
    std::string id_;
    boost::atomic_int  indexCount_;
    Service *service_;
    time_t msStart_;
};

//////////////////////////////////////////////////////////////////////////
// 网络消息缓存
class NetMessageCache {
public:
    NetMessageCache();
    void add(const MessagePack &msg);
    bool getAndWait(const std::string &id, MessagePack &msg);
    void delAndNotify(const std::string &id);
    void printLog() const;

private:
    boost::mutex mutex_;
    boost::condition_variable cond_;
    std::map<std::string, MessagePack> allMsg_;

private:
    NetMessageCache(const NetMessageCache&);
    NetMessageCache& operator=(const NetMessageCache&);
};
//////////////////////////////////////////////////////////////////////////
// 线程工作者
class Worker {
public:
    Worker(int i, NetMessageCache *msgCache);
    int index() const;
    void addMsgId(const std::string &id);
    void join();
    boost::thread::id threadId() const;

    void onHandle();

private:
    Worker(const Worker&);
    Worker& operator=(const Worker&);

    int index_;
    bool running_;
    std::queue<std::string> msgIdQueue_;
    boost::mutex mutex_;
    boost::condition_variable cond_;
    NetMessageCache* msgCache_;
    boost::thread thread_;
};

//////////////////////////////////////////////////////////////////////////
// 工作调度中心
// 不同的消息分发给不同的worker
class WorkerCenter
{
public:
    static WorkerCenter* GetInstance();
    void init(int keyThreadCount, int otherThreadCount);
    void addMsg(const MessagePack &msg);
    int keyThreadCount() const { return keyWorkerList_.size(); }
    int otherThreadCount() const { return otherWorkerList_.size(); }

private:
    WorkerCenter();
    ~WorkerCenter();
    WorkerCenter(const WorkerCenter&);
    WorkerCenter& operator=(const WorkerCenter&);

    std::vector<std::shared_ptr<Worker> > keyWorkerList_;
    std::vector<std::shared_ptr<Worker> > otherWorkerList_;
    std::unique_ptr<NetMessageCache> msgCache_;
    boost::atomic_int otherWorkerIndex_;
};
// .cpp
boost::hash<std::string> str_hash;

MessagePack::MessagePack()
    : ignoreLog_(false), service_(NULL)
{
    indexCount_.store(0);
    msStart_ = GetCurrentTimeMilliSec();
}

MessagePack::MessagePack(Service *service, const MessagePtr &msg, bool ignoreLog, const std::set<int> &indexs)
    : ignoreLog_(ignoreLog), service_(service), msg_(msg), indexs_(indexs)
{
    id_ = msg_->GetMessageID();
    indexCount_.store(indexs_.size());
    msStart_ = GetCurrentTimeMilliSec();
}

MessagePack::MessagePack(const MessagePack& other)
{
    ignoreLog_ = other.ignoreLog_;
    msg_ = other.msg_;
    indexs_ = other.indexs_;
    id_ = other.id_;
    indexCount_.store(other.indexCount_);
    service_ = other.service_;
    msStart_ = other.msStart_;
}

MessagePack& MessagePack::operator = (const MessagePack& other)
{
    if (this == &other) {
        return *this;
    }
    ignoreLog_ = other.ignoreLog_;
    msg_ = other.msg_;
    indexs_ = other.indexs_;
    id_ = other.id_;
    indexCount_.store(other.indexCount_);
    service_ = other.service_;
    msStart_ = other.msStart_;
    return *this;
}

bool MessagePack::ignoreLog() const
{
    return ignoreLog_;
}

const std::set<int>& MessagePack::indexList() const
{
    return indexs_;
}

bool MessagePack::isMultiIndex() const
{
    return indexs_.size() > 1;
}

int MessagePack::indexCountSub()
{
    return --indexCount_;
}

int MessagePack::indexCount() const
{
    return indexCount_.load();
}

MessagePtr MessagePack::msg() const
{
    return msg_;
}

const std::string& MessagePack::id() const
{
    return id_;
}

void MessagePack::handle()
{
    if (service_) {
        service_->processReqMsg(msg_.get());
        if (!ignoreLog_) {
            LOGGER_INFO("message pack handle finished, type:" << msg_->GetType().c_str() << ", id:" << id_ <<", cost:" << (GetCurrentTimeMilliSec() - msStart_) << "ms");
        }
    } else {
        LOGGER_ERROR("SERVICE ERROR");
    }
}

time_t MessagePack::msStart() const
{
    return msStart_;
}
//////////////////////////////////////////////////////////////////////////

NetMessageCache::NetMessageCache()
{
}

void NetMessageCache::add(const MessagePack &msg)
{
    boost::unique_lock<boost::mutex> lock(mutex_);
    allMsg_[msg.id()] = msg;
    if (allMsg_.size() > 1000) {
        LOGGER_WARN("NetMessageCache::add service busy, msgCacheSize:" << allMsg_.size());
    }
}

bool NetMessageCache::getAndWait(const std::string &id, MessagePack &msg)
{
    const int WAIT_TIMEOUT_SECOND = 60;
    boost::unique_lock<boost::mutex> lock(mutex_);
    bool isFirst = true;
    bool isExist = true;
    while (isExist) {
        std::map<std::string, MessagePack>::iterator iter = allMsg_.find(id);
        if (iter == allMsg_.end()) {
            isExist = false;
            continue;
        }

        // 不是多index,直接返回然后删除
        if (!iter->second.isMultiIndex()) {
            msg = iter->second;
            allMsg_.erase(iter);
            return true;
        }

        int count = iter->second.indexCount();
        if (isFirst) {
            // 只考虑第一次进入时,因为这里会执行多次,会造成index count多次递减
            // 如果index count递减后为0说明其他线程前面的消息已经执行完成了
            // 保证有相同key的消息是顺序执行的
            isFirst = false;
            count = iter->second.indexCountSub();
            if (count == 0) {
                msg = iter->second;
                return true;
            }
        }
        // count大于0时要等待其他线程执行完成
        time_t waitStartTime = GetCurrentTimeMilliSec();
        cond_.timed_wait(lock, boost::get_system_time() + boost::posix_time::seconds(WAIT_TIMEOUT_SECOND));
        if (GetCurrentTimeMilliSec() - waitStartTime > (WAIT_TIMEOUT_SECOND - 1) * 1000) {
            LOGGER_ERROR("NetMessageCache wait timeout " << WAIT_TIMEOUT_SECOND << ", id:" << id << ", count:" << count);
            printLog();
        }
    }
    return false;
}

// 这个函数可以优化下,去掉index count判断,因为外面已经有了index count 小于等于1的消息不应该调用这个函数
void NetMessageCache::delAndNotify(const std::string &id)
{
    boost::unique_lock<boost::mutex> lock(mutex_);
    std::map<std::string, MessagePack>::iterator iter = allMsg_.find(id);
    if (iter != allMsg_.end()) {
        // 能够删除的消息key count应该都是0
        if (iter->second.indexCount() != 0) {
            LOGGER_ERROR("NetMessageCache ERROR ref count:" << iter->second.indexCount());
        }
        bool isMultiIndex = iter->second.isMultiIndex();
        allMsg_.erase(iter);
        // 只有多index的消息才会造成阻塞
        if (isMultiIndex) {
            cond_.notify_all();
        }
    }
}

void NetMessageCache::printLog() const
{
    std::map<std::string, MessagePack>::const_iterator iter = allMsg_.cbegin();
    while (iter != allMsg_.cend()) {
        const MessagePack &msg = iter->second;
        LOGGER_INFO("NetMessageCache msgid:" << iter->first << ", indexCount:" << msg.indexList().size()
            << ", curIndexCount:" << msg.indexCount() << ", cost:" << (GetCurrentTimeMilliSec() - msg.msStart()) << "ms");
        ++iter;
    }
    LOGGER_INFO("-----------------------------------------");
}
//////////////////////////////////////////////////////////////////////////
Worker::Worker(int i, NetMessageCache *msgCache)
    : index_(i), running_(true), msgCache_(msgCache)
    , thread_(boost::bind(&Worker::onHandle, this))
{
}

int Worker::index() const
{
    return index_;
}

void Worker::addMsgId(const std::string &id)
{
    boost::unique_lock<boost::mutex> lock(mutex_);
    msgIdQueue_.push(id);
    int count = msgIdQueue_.size();
    cond_.notify_one();
    if (count > 100) {
        LOGGER_WARN("Worker::addMsgId thread busy index:" << index_ << ", threadId:" << threadId() << ", queueSize:" << msgIdQueue_.size());
    }
}

void Worker::join() {
    running_ = false;
    if (thread_.joinable()) {
        // 线程可能被阻塞,join会一直等待
        thread_.join();
    }
}

boost::thread::id Worker::threadId() const
{
    return thread_.get_id();
}

void Worker::onHandle()
{
    while (running_) {
        std::string msgId;
        {
            boost::unique_lock<boost::mutex> lock(mutex_);
            while (msgIdQueue_.empty()) {
                cond_.wait(lock);
            }
            msgId = msgIdQueue_.front();
            msgIdQueue_.pop();
        }

        if (!msgId.empty()) {
            MessagePack msg;
            // 消息没有找到被其他线程消费掉了
            if (msgCache_->getAndWait(msgId, msg)) {
                // 消息处理
                msg.handle();
                // 从缓存中删掉消息,通知其他线程消息被消费掉了
                if (msg.isMultiIndex()) {
                    msgCache_->delAndNotify(msgId);
                }
            }
        }
    }
}
//////////////////////////////////////////////////////////////////////////
WorkerCenter::WorkerCenter()
    : msgCache_(new NetMessageCache()), otherWorkerIndex_(0)
{
}

WorkerCenter::~WorkerCenter()
{
    for (std::vector<std::shared_ptr<Worker> >::iterator iter = keyWorkerList_.begin(); iter != keyWorkerList_.end(); ++iter) {
        (*iter)->join();
    }
    for (std::vector<std::shared_ptr<Worker> >::iterator iter = otherWorkerList_.begin(); iter != otherWorkerList_.end(); ++iter) {
        (*iter)->join();
    }
}

WorkerCenter* WorkerCenter::GetInstance()
{
    static WorkerCenter s_inst;
    return &s_inst;
}

void WorkerCenter::init(int keyThreadCount, int otherThreadCount)
{
    LOGGER_INFO("WorkerCenter init, keyThreadCount:" << keyThreadCount << ", otherThreadCount:" << otherThreadCount);
    for (int i = 0; i < keyThreadCount; i++) {
        std::shared_ptr<Worker> worker = std::make_shared<Worker>(i, msgCache_.get());
        keyWorkerList_.push_back(worker);
        LOGGER_INFO("WorkerCenter init, key thread index:" << worker->index() << ", id:" << worker->threadId());
    }
    for (int i = 0; i < otherThreadCount; i++) {
        std::shared_ptr<Worker> worker = std::make_shared<Worker>(i, msgCache_.get());
        otherWorkerList_.push_back(worker);
        LOGGER_INFO("WorkerCenter init, other thread index:" << worker->index() << ", id:" << worker->threadId());
    }
}

void WorkerCenter::addMsg(const MessagePack &msg)
{
    msgCache_->add(msg);
    const std::set<int>& indexList = msg.indexList();
    if (indexList.empty()) {
        // 无index的消息
        // 线程轮询处理
        int index = (otherWorkerIndex_++) % otherWorkerList_.size();
        otherWorkerList_[index]->addMsgId(msg.id());
        if (!msg.ignoreLog()) {
            LOGGER_INFO("WorkerCenter::addMsg other thread id:" << msg.id() << ", index:" << index);
        }
    } else {
        // 有index的消息
        // 将消息分发到index对应的线程
        std::stringstream ss;
        for (std::set<int>::iterator iter = indexList.begin(); iter != indexList.end(); ++iter) {
            keyWorkerList_[*iter]->addMsgId(msg.id());
            if (!msg.ignoreLog()) {
                ss << *iter << " ";
            }
        }
        if (!msg.ignoreLog()) {
            LOGGER_INFO("WorkerCenter::addMsg key thread id:" << msg.id() << ", index:" << ss.str());
        }
    }
}

外部调用:WorkerCenter::addMsg

文章评论

Keep it simple,stupid
文章数
338
今日访问
1817
今日IP数
643
最近评论

liangzi: 不错 谢谢分享
tujiaw: registerThreadInactive:如果当前没有激活的线程,就去激活线程,让等待的线程去执行任务。
hgzzx: 佩服佩服。 请教:registerThreadInactive的作用是什么?
xuehaoyun: 很不错,来围观一下
tujiaw: 抱歉csdn code服务关闭了,这个代码我也找不到了
于淞: 你好,这个文章的源码能分享一下吗,songsong9181@163.com,谢谢了 上面的写错了
回到顶部