std::thread::hardware_concurrency()

返回硬件线程上下文的数量,通常是CPU内核数量

template numeric_limits

根据当前平台,获取指定类型的信息

std::numeric_limits::min() 获取int最小值 std::numeric_limits::max() 获取unsigned long最大值

std::unique_ptr

管理一个指针对象,在出作用域后自动销毁对象,而且它更安全不允许直接通过赋值来修改它

std::upper_bound

std::upper_bound (v.begin(), v.end(), 20)
返回最后一个比20大的迭代器,没找到返回v.end()

std::lower_bound

std::lower_bound(v.begin(), v.end(), 20)
返回第一个等于20的迭代器,没找到返回v.end()

std::unique_lock

构造的时候加锁,析构的时候解锁,保证加解锁是成对的不会漏掉

源码如下,做了些注释: runnable.h

#ifndef RUNNABLE_H
#define RUNNABLE_H

// 可运行的任务,默认支持自动销毁,通过重载run来实现要执行的任务
class Runnable {
public:
    Runnable(void) : m_ref(0) {}
    virtual ~Runnable(void) {}
    virtual void run(void) = 0;

    bool autoDelete(void) const { return this->m_ref != -1; }
    void setAutoDelete(bool v) { this->m_ref = v ? 0 : -1; }

private:
    int m_ref;

    friend class ThreadPool;
    friend class ThreadPoolPrivate;
    friend class ThreadPoolThread;
};

#endif // RUNNABLE_H

threadpoolthread.h

#ifndef THREADPOOLTHREAD_H
#define THREADPOOLTHREAD_H

#include <condition_variable>

class Runnable;
class ThreadPoolPrivate;

namespace std {
    class thread;
}

class ThreadPoolThread {
public:
    ThreadPoolThread(ThreadPoolPrivate* manager);

    // 线程启动后立即调用
    void operator()(void);

    void registerThreadInactive(void);

    std::condition_variable runnableReady;
    ThreadPoolPrivate* manager;
    Runnable* runnable;
    std::thread* thread;
};

#endif // THREADPOOLTHREAD_H

threadpoolthread.cpp

#include <mutex>
#include <thread>
#include "threadpoolthread.h"
#include "threadpool_p.h"
#include "runnable.h"

ThreadPoolThread::ThreadPoolThread(ThreadPoolPrivate* manager)
    : manager(manager), runnable(nullptr), thread(nullptr)
{
}

void ThreadPoolThread::operator()(void)
{
    std::unique_lock<std::mutex> locker(this->manager->mutex);
    while (true) {
        Runnable* r    = this->runnable;
        this->runnable = nullptr;

        do {
            if (r) {
                const bool auto_delete = r->autoDelete();

                locker.unlock();
                r->run();
                locker.lock();

                if (auto_delete && !--r->m_ref) {
                    delete r;
                }
            }

            if (this->manager->tooManyThreadsActive()) {
                break;
            }

            if (this->manager->queue.empty()) {
                r = nullptr;
            }
            else {
                r = this->manager->queue.front().first;
                this->manager->queue.pop_front();
            }
        } while (r);

        if (this->manager->isExiting) {
            this->registerThreadInactive();
            break;
        }

        bool expired = this->manager->tooManyThreadsActive();
        if (!expired) {
            this->manager->waitingThreads.push_back(this);
            this->registerThreadInactive();
            this->runnableReady.wait_for(locker, std::chrono::milliseconds(manager->expiryTimeout));
            ++manager->activeThreads;

            for (auto it = this->manager->waitingThreads.begin(); it != this->manager->waitingThreads.end(); ++it) {
                if (*it == this) {
                    this->manager->waitingThreads.erase(it);
                    expired = true;
                    break;
                }
            }
        }

        if (expired) {
            this->manager->expiredThreads.push_back(this);
            this->registerThreadInactive();
            break;
        }
    }
}

void ThreadPoolThread::registerThreadInactive(void)
{
    if (--this->manager->activeThreads == 0) {
        this->manager->noActiveThreads.notify_all();
    }
}

threadpool_p.h

#ifndef THREADPOOL_P_H
#define THREADPOOL_P_H

#include <condition_variable>
#include <list>
#include <mutex>
#include <set>
#include <utility>

class Runnable;
class ThreadPoolThread;

// 线程池内部接口,所有的成员变量都封装在这里
class ThreadPoolPrivate {
public:
    ThreadPoolPrivate(void);

    // 试着去执行一个任务,如果激活线程已经满了返回false
    bool tryStart(Runnable* runnable);

    // 将新任务插入到优先级相同的任务后面
    void enqueueTask(Runnable* runnable, int priority = 0);

    std::size_t activeThreadCount(void) const;

    // 使用更多的线程执行更多的任务
    void tryToStartMoreThreads(void);
    bool tooManyThreadsActive(void) const;

    void startThread(Runnable* runnable = nullptr);
    void reset(void);
    bool waitForDone(unsigned long int msecs);
    void clear(void);

    // 偷窃一个任务(删除一个任务)
    bool stealRunnable(Runnable* runnable);
    // 偷窃一个任务(删除一个任务)并运行此任务
    void stealAndRunRunnable(Runnable* runnable);

    mutable std::mutex mutex;
    std::set<ThreadPoolThread*> allThreads;
    std::list<ThreadPoolThread*> waitingThreads;
    std::list<ThreadPoolThread*> expiredThreads;
    std::list<std::pair<Runnable*, int> > queue;
    std::condition_variable noActiveThreads;

    bool isExiting;
    unsigned long int expiryTimeout;
    std::size_t maxThreadCount;
    std::size_t reservedThreads;
    std::size_t activeThreads;
};

#endif // THREADPOOL_P_H

threadpool_p.cpp

#include <algorithm>
#include <chrono>
#include <thread>
#include <memory>
#include "threadpool_p.h"
#include "threadpoolthread.h"
#include "runnable.h"

ThreadPoolPrivate::ThreadPoolPrivate(void)
    : isExiting(false), expiryTimeout(30000),
      maxThreadCount(std::max(std::thread::hardware_concurrency(), 1u)),
      reservedThreads(0), activeThreads(0)
{
}

bool ThreadPoolPrivate::tryStart(Runnable* task)
{
    if (this->allThreads.empty()) {
        this->startThread(task);
        return true;
    }

    if (this->activeThreadCount() >= this->maxThreadCount) {
        return false;
    }

    if (this->waitingThreads.size()) {
        this->enqueueTask(task);
        ThreadPoolThread* t = this->waitingThreads.front();
        this->waitingThreads.pop_front();
        t->runnableReady.notify_one();
        return true;
    }

    if (this->expiredThreads.size()) {
        ThreadPoolThread* t = this->expiredThreads.front();
        this->expiredThreads.pop_front();

        ++this->activeThreads;

        if (task->autoDelete()) {
            ++task->m_ref;
        }

        t->runnable = task;
        t->thread->join();
        t->thread = std::move(new std::thread(&ThreadPoolThread::operator(), t));
        return true;
    }

    this->startThread(task);
    return true;
}

// 重载操作符,供upper_bound使用
inline bool operator<(int priority, const std::pair<Runnable*, int>& p)
{
    return p.second < priority;
}

inline bool operator<(const std::pair<Runnable*, int>&p, int priority)
{
    return priority < p.second;
}

void ThreadPoolPrivate::enqueueTask(Runnable* runnable, int priority)
{
    if (runnable->autoDelete()) {
         ++runnable->m_ref;
    }

    auto it = std::upper_bound(this->queue.begin(), this->queue.end(), priority);
    this->queue.insert(it, std::make_pair(runnable, priority));
}

std::size_t ThreadPoolPrivate::activeThreadCount(void) const
{
    return
          this->allThreads.size()
        - this->waitingThreads.size()
        - this->expiredThreads.size()
        + this->reservedThreads
    ;
}

void ThreadPoolPrivate::tryToStartMoreThreads(void)
{
    while (!this->queue.empty() && this->tryStart(this->queue.front().first)) {
        this->queue.pop_front();
    }
}

bool ThreadPoolPrivate::tooManyThreadsActive(void) const
{
    const std::size_t activeThreadCount = this->activeThreadCount();
    return activeThreadCount > this->maxThreadCount && (activeThreadCount - this->reservedThreads) > 1;
}

void ThreadPoolPrivate::startThread(Runnable* runnable)
{
    std::unique_ptr<ThreadPoolThread> thread(new ThreadPoolThread(this));
    this->allThreads.insert(thread.get());
    ++this->activeThreads;

    if (runnable->autoDelete()) {
        ++runnable->m_ref;
    }

    thread->runnable = runnable;
    thread->thread   = new std::thread(&ThreadPoolThread::operator(), thread.get());
    thread.release();
}

void ThreadPoolPrivate::reset(void)
{
    std::unique_lock<std::mutex> locker(this->mutex);
    this->isExiting = true;

    while (!this->allThreads.empty()) {
        std::set<ThreadPoolThread*> allThreadsCopy;
        allThreadsCopy.swap(this->allThreads);
        locker.unlock();

        for (auto it = allThreadsCopy.begin(); it != allThreadsCopy.end(); ++it) {
            (*it)->runnableReady.notify_all();
            (*it)->thread->join();
            delete (*it)->thread;
            delete (*it);
        }

        locker.lock();
    }

    this->waitingThreads.clear();
    this->expiredThreads.clear();
    isExiting = false;
}

bool ThreadPoolPrivate::waitForDone(unsigned long int msecs)
{
    std::unique_lock<std::mutex> locker(this->mutex);
    if (msecs == std::numeric_limits<unsigned long int>::max()) {
        while (!(this->queue.empty() && this->activeThreads == 0)) {
            this->noActiveThreads.wait(locker);
        }
    }
    else {
        auto start = std::chrono::steady_clock::now();
        auto till  = start + std::chrono::milliseconds(msecs);
        while (
               !(this->queue.empty() && this->activeThreads == 0)
            && std::chrono::steady_clock::now() < till
        ) {
            this->noActiveThreads.wait_until(locker, till);
        }
    }

    return this->queue.empty() && !this->activeThreads;
}

void ThreadPoolPrivate::clear(void)
{
    std::unique_lock<std::mutex> locker(this->mutex);
    while (!this->queue.empty()) {
        std::pair<Runnable*, int>& item = this->queue.front();
        Runnable* r = item.first;
        if (r->autoDelete() && --r->m_ref) {
            delete r;
        }

        this->queue.pop_front();
    }
}

bool ThreadPoolPrivate::stealRunnable(Runnable* runnable)
{
    if (!runnable) {
        return false;
    }

    std::unique_lock<std::mutex> locker(this->mutex);
    auto it  = this->queue.begin();
    auto end = this->queue.end();

    while (it != end) {
        if (it->first == runnable) {
            this->queue.erase(it);
            return true;
        }

        ++it;
    }

    return false;
}

void ThreadPoolPrivate::stealAndRunRunnable(Runnable* runnable)
{
    if (!this->stealRunnable(runnable)) {
        return;
    }

    const bool autoDelete = runnable->autoDelete();
    bool del = autoDelete && !--runnable->m_ref;

    runnable->run();

    if (del) {
        delete runnable;
    }
}

threadpool.h

#ifndef THREADPOOL_H
#define THREADPOOL_H

#include <limits>
#include <memory>
#include <vector>

class Runnable;
class ThreadPoolPrivate;

// 线程池,外部接口供用户使用
class ThreadPool {
public:
    ThreadPool(void);
    ~ThreadPool(void);

    // 启动一个任务,如果没有空闲的线程就入队
    void start(Runnable* runnable, int priority = 0);
    // 试着去启动一个任务,如果没有空闲线程就返回false
    bool tryStart(Runnable* runnable);

    // 到期时间,等待时间
    unsigned long int expiryTimeout(void) const;
    void setExpiryTimeout(unsigned long int v);

    // 最大线程数,初始化为CPU内核数
    std::size_t maxThreadCount(void) const;
    void setMaxThreadCount(std::size_t n);

    // 激活的线程数
    std::size_t activeThreadCount(void) const;

    // 任务队列大小
    std::size_t queueSize(void) const;

    // 保留线程,暂时不使用,调用一次新增一个
    void reserveThread(void);
    // 释放线程,将保留的线程拿出来使用,调用一次释放一次
    void releaseThread(void);

    // 等待所有激活的线程执行完成
    bool waitForDone(unsigned long int timeout = std::numeric_limits<unsigned long int>::max());

    // 清理掉所有未执行的任务
    void clear(void);

    // 删除一个任务
    void cancel(Runnable* runnable);

private:
    friend class ThreadPoolPrivate;

    const std::unique_ptr<ThreadPoolPrivate> d_ptr;

    inline ThreadPoolPrivate* d_func(void) { return this->d_ptr.get(); }
    inline const ThreadPoolPrivate* d_func(void) const { return this->d_ptr.get(); }
};

#endif // THREADPOOL_H

threadpool.cpp

#include <mutex>
#include "threadpool.h"
#include "threadpool_p.h"
#include "threadpoolthread.h"
#include "runnable.h"

ThreadPool::ThreadPool(void)
    : d_ptr(new ThreadPoolPrivate())
{
}

ThreadPool::~ThreadPool(void)
{
    this->waitForDone();
}

void ThreadPool::start(Runnable* runnable, int priority)
{
    if (!runnable) {
        return;
    }

    ThreadPoolPrivate* const d = this->d_func();
    std::unique_lock<std::mutex> locker(d->mutex);
    if (!d->tryStart(runnable)) {
        d->enqueueTask(runnable, priority);

        if (!d->waitingThreads.empty()) {
            ThreadPoolThread* t = d->waitingThreads.front();
            d->waitingThreads.pop_front();
            t->runnableReady.notify_one();
        }
    }
}

bool ThreadPool::tryStart(Runnable* runnable)
{
    if (!runnable) {
        return false;
    }

    ThreadPoolPrivate* const d = this->d_func();
    std::unique_lock<std::mutex> locker(d->mutex);

    if (d->allThreads.empty() && d->activeThreadCount() >= d->maxThreadCount) {
        return false;
    }

    return d->tryStart(runnable);
}

unsigned long int ThreadPool::expiryTimeout(void) const
{
    return this->d_func()->expiryTimeout;
}

void ThreadPool::setExpiryTimeout(unsigned long int v)
{
    this->d_func()->expiryTimeout = v;
}

std::size_t ThreadPool::maxThreadCount(void) const
{
    return this->d_func()->maxThreadCount;
}

void ThreadPool::setMaxThreadCount(std::size_t n)
{
    ThreadPoolPrivate* const d = this->d_func();
    if (d->maxThreadCount == n) {
        return;
    }

    d->maxThreadCount = n;
    d->tryToStartMoreThreads();
}

std::size_t ThreadPool::activeThreadCount(void) const
{
    const ThreadPoolPrivate* const d = this->d_func();
    std::unique_lock<std::mutex> locker(d->mutex);
    return d->activeThreadCount();
}

std::size_t ThreadPool::queueSize(void) const
{
    return this->d_func()->queue.size();
}

void ThreadPool::reserveThread(void)
{
    ThreadPoolPrivate* const d = this->d_func();
    std::unique_lock<std::mutex> locker(d->mutex);
    ++d->reservedThreads;
}

void ThreadPool::releaseThread(void)
{
    ThreadPoolPrivate* const d = this->d_func();
    std::unique_lock<std::mutex> locker(d->mutex);
    --d->reservedThreads;
    d->tryToStartMoreThreads();
}

bool ThreadPool::waitForDone(unsigned long int msec)
{
    ThreadPoolPrivate* const d = this->d_func();
    bool ret = d->waitForDone(msec);
    if (ret) {
        d->reset();
    }

    return ret;
}

void ThreadPool::clear(void)
{
    this->d_func()->clear();
}

void ThreadPool::cancel(Runnable* runnable)
{
    ThreadPoolPrivate* const d = this->d_func();
    if (!d->stealRunnable(runnable)) {
        return;
    }

    if (runnable->autoDelete() && !--runnable->m_ref) {
        delete runnable;
    }
}

main.cpp

#include <iostream>
#include <thread>
#include <mutex>
#include "threadpool.h"
#include "runnable.h"

std::mutex s_mutex;
class Task : public Runnable
{
public:
    Task(int i)
        : i_(i)
    {
    }
    void run()
    {
        std::unique_lock<std::mutex> lock(s_mutex);
        //std::this_thread::sleep_for(std::chrono::milliseconds(100));
        std::cout << "number:" << i_ << ",thread id:" << std::this_thread::get_id() << std::endl;
    }
private:
    int i_;
};

int _tmain(int argc, _TCHAR* argv[])
{

    std::unique_ptr<ThreadPool> tp(new ThreadPool());
    for (int i = 0; i < 100; i++) {
        tp->start(new Task(i));
    }

    system("pause");
    return 0;
}