作为使用C ++ 11功能的练习,我决定创建一个线程池类。

我想对代码进行一下回顾,重点是:


标准符合性/可移植性问题/最佳实践
线程安全/异常处理
API
性能问题(是的,我知道std::mutex在Windows上运行缓慢,但除此之外:)关于如何摆脱void返回类型的特殊性的任何想法?

我已在类声明中内联了所有方法,以使其在此处易于阅读。在实际的实现中,它们没有内联。很抱歉在某些地方出现换行失败,CR代码窗口比我想要的要窄。

#include <functional>
#include <future>
#include <deque>
#include <thread>

/// <summary> A typical thread worker queue that can execute arbitrary jobs. 
/// </summary>
///
/// <remarks>
///  * Thread Safety   : Full.
///  * Exception Safety: Strong. </remarks>
class WorkQueue{
public:
    /// <summary> Constructors a new work queue object. </summary>
    /// <param name="numWorkers"> (Optional) number of workers, less than 0 to 
    ///     auto-detect (may fail on esoteric platforms). </param>
    explicit WorkQueue(int numWorkers = -1){
        if (numWorkers < 1){
            numWorkers = std::thread::hardware_concurrency() + 1;
        }
        while (numWorkers--){
            m_workers.emplace_back(std::thread(&WorkQueue::doWork, this));
        }
    }

    /// <summary> Will abort all pending jobs and run any in-progress jobs to 
    ///     completion upon destruction. </summary>
    ~WorkQueue(){
        abort();
    }

    /// <summary> Stops work queue and finishes jobs currently being executed.
    ///     Queued jobs that have not begun execution will have their promises 
    ///     broken. </summary>
    void abort(){
        m_exit = true;
        m_finish_work = false;
        m_signal.notify_all();
        joinAll();

        {
            std::lock_guard<std::mutex> lg(m_mutex);
            m_work.clear();
        }
    }

    /// <summary> Stops new work from being submitted to this work queue.</summary>
    void stop(){
        m_exit = true;
        m_finish_work = true;
        m_signal.notify_all();
    }

    /// <summary> Wait for completion of all submitted work. No more work will 
    ///     be allowed to be submitted. </summary>
    void waitForCompletion(){
        stop();
        joinAll();
        assert(m_work.empty());
    }

    /// <summary> Executes the given function asynchronously. </summary>
    /// <exception cref="std::runtime_error"> Thrown if attempting to submit a job
    ///     to a work queue that is terminating. </exception>
    /// <param name="function"> [in] The function to execute. </param>
    /// <returns> A std::future<RETVAL> for the result that will be generated by 
    ///     the function argument. Exceptions from the function will be 
    ///     thrown by get() on the future.</returns>
    template<typename RETVAL>
    std::future<RETVAL> submit(std::function<RETVAL()>&& function){
        if (m_exit){
            throw std::runtime_error("Caught work submission to work queue that is desisting.");
        }

        // Workaround for lack of lambda move capture
        typedef std::pair<std::promise<RETVAL>, std::function<RETVAL()>> pair_t;
        std::shared_ptr<pair_t> data = std::make_shared<pair_t>(std::promise<RETVAL>(), std::move(function));

        std::future<RETVAL> future = data->first.get_future();

        {
            std::lock_guard<std::mutex> lg(m_mutex);
            m_work.emplace_back([data](){
                try{
                    data->first.set_value(data->second());
                }
                catch (...){
                    data->first.set_exception(std::current_exception());
                }
            });
        }
        m_signal.notify_one();
        return std::move(future);
    }

    template<>
    std::future<void> submit(std::function<void()>&& function){
        if (m_exit){
            throw std::runtime_error("Caught work submission to work queue that is desisting.");
        }
        // Workaround for lack of lambda move capture
        typedef std::pair<std::promise<void>, std::function<void()>> pair_t;
        std::shared_ptr<pair_t> data = std::make_shared<pair_t>(std::promise<void>(), std::move(function));

        std::future<void> future = data->first.get_future();

        {
            std::lock_guard<std::mutex> lg(m_mutex);
            m_work.emplace_back([data](){
                try{
                    data->second();
                    data->first.set_value();
                }
                catch (...){
                    data->first.set_exception(std::current_exception());
                }
            });
        }
        m_signal.notify_one();

        return std::move(future);
    }

private:
    std::deque<std::function<void()>> m_work;
    std::mutex m_mutex;
    std::condition_variable m_signal;
    std::atomic<bool> m_exit{ false };
    std::atomic<bool> m_finish_work{ true };
    std::vector<std::thread> m_workers;

    void doWork(){
        std::unique_lock<std::mutex> ul(m_mutex);
        while (!m_exit || (m_finish_work && !m_work.empty())){
            if (!m_work.empty()){
                std::function<void()> work(std::move(m_work.front()));
                m_work.pop_front();
                ul.unlock();
                work();
                ul.lock();
            }
            else{
                m_signal.wait(ul);
            }
        }
    }

    void joinAll(){
        for (auto& thread : m_workers){
            thread.join();
        }
        m_workers.clear();
    }

    void operator=(const WorkQueue&) = delete;
    WorkQueue(const WorkQueue&) = delete;
};


示例用法:

int main(){
    WorkQueue wq;
    wq.submit<void>([](){std::cout << "foo" << std::endl; });
    wq.submit<void>([](){std::cout << "bar" << std::endl; });
    std::future<int> f0 = wq.submit<int>([](){return 4; });
    std::future<int> f1 = wq.submit<int>([](){return 40; });

    std::cout << f1.get() << std::endl;
    wq.waitForCompletion();
    std::cout << f.get() << std::endl;
    return 0;
}


#1 楼

请注意,代码的显示方式应与使用它的方式相同,以避免虚假错误。按原样编译代码,使我对submit的专业化产生了错误。

命名告诉存储的内容

pair_t可能会有所改善,尽管我还没有任何想法

m_work应该重命名为m_mutex以表明其用途(或捆绑销售)它与m_work_mutex一起使用,以致无法在不锁定m_work的情况下访问m_work

m_mutex应该命名为m_signal(或m_exit,其逻辑应颠倒,因为(布尔)变量名的取反使推理更困难)

m_accept_no_more_work可以命名为accept_more_work


自动导出返回类型

为什么提取时将返回类型赋予m_workers会困扰用户?

template <typename FunctionObject>
auto submit(FunctionObject &&function) -> std::future<decltype(function())>;


删除m_worker_threads中的代码重复


您的两个submit函数具有相同的代码,表明您已开启template参数太粗糙。唯一不同的部分在submit块中。显而易见的解决方案是将不同部分提取到自己的函数中。简而言之,我将使用模板功能专门化: >它们替换了try catch块中的呼叫。由于submit与这些类型之间存在非平凡的依赖关系,因此不能用于自动模板参数推导:

template <>
inline void WorkQueue::execute_and_set_data<void>(const DataPointer<void> &data) {
    data->second();
    data->first.set_value();
}

template <typename ReturnType>
void WorkQueue::execute_and_set_data(const DataPointer<ReturnType> &data) {
    data->first.set_value(data->second());
}


但是,我建议用一个模板化的try{}替换ReturnType,该模板具有比PromiseFunctionPairstruct更好的成员名称,这将允许自动模板参数推导。

遗漏包括


您正在使用不带firstsecond

您正在使用不带std::vector#include <vector>


错别字




assert应该应该是#include <cassert>吗?

除了上述缺陷,我非常喜欢阅读您的代码:)

评论


\ $ \ begingroup \ $
很棒的评论!谢谢!顺便问一下,您使用了什么编译器?提交方法的编译与VS2013相同。至于包含,我的默认PCH必须包含包含矢量,并且它在测试程序中。 f.get()是一个错字,是的,我在事实之后补充了一个例子。我以为我不会弄乱这么简单的加法...数字大声笑。我将等待一个更长的时间来接受答案,看看是否还有更多评论。
\ $ \ endgroup \ $
–艾米莉·L。
2014年8月19日在9:45

\ $ \ begingroup \ $
我同时使用了gcc 4.9和clang ++ 3.6,后者给出了错误错误:类范围内'submit'的显式专门化请参见此SO答案,了解标准在此问题上的立场。关于PCH评论:我已经多次遇到这个问题。当您仅在包含必需标头的源文件中使用标头时,也会发生此问题。对于标头有一个虚拟源文件会有所帮助,该文件不包含任何其他内容(并且没有PCH)以检测丢失的包含文件。
\ $ \ endgroup \ $
–没人远离SE
2014年8月19日在10:55



#2 楼

首先,我要说这是一个编写良好的文档,并提供了很好的文档。我还认为,没人能提供很好的建议,因此我将主要基于此。

我只有一个正确的评论:您在joinAll中有问题。如果从多个线程调用waitForCompletion和/或abort(我当然可以想象一个调用waitForCompletion的线程,另一个调用abort的线程),则您在joinAll中存在竞争;当另一个调用m_workers.clear()时,一个线程可能处于迭代中。请尝试以下操作:

void joinAll(){
    std::vector<std::thread> workers;
    {
        std::lock_guard<std::mutex> lg(m_mutex);
        workers = std::move(m_workers);
    }
    for (auto& thread : workers) {
        thread.join();
    }
}


(这只是部分解决方案;您必须添加条件变量或类似变量,以确保将来对joinAll的调用不会t会在此线程加入所有线程之前返回。)

您记录WorkQueue::WorkQueue,说numWorkers < 0会导致自动检测,但是只要numWorkers < 1就会自动检测。也就是说,您的文档和实现在numWorkers == 0时的行为上有所不同。

为什么自动检测时创建的线程比硬件支持的并发线程数多?这是否只是为了避免对0的潜在收益进行特殊处理?

我建议为m_workers保留空间:m_workers.reserve(numWorkers);。在这种情况下,它很小,但却是一个好习惯。

初始化m_workers时,请使用m_workers.emplace_back(std::thread(...))。我可以使用m_workers.emplace_back(...)m_workers.push_back(std::thread(...))(我更喜欢第一个)。 emplace_back采用元素类型的构造函数参数,并且push_back具有右值引用重载。

stop中,我避免设置m_finish_work;如果abort已经被调用,我不认为您要覆盖该设置,如果尚未调用,则m_finish_work已经为true。

我当然同意没人建议(a)提取execute_and_set_data和(b)提交由function类型参数化的模板,而不是将其强制为std::function<T()>的建议。我也同意您应该使用命名字段引入struct而不是使用std::pair

submit应该返回future而不是std::move(future)。从函数返回局部变量已经移动了它,并且显式调用std::move会禁止NRVO。或者,返回data->first.get_future();您永远不会在其定义和返回行之外引用future(也许这是您将promise移开的代码的早期版本中的工件?)。

我想看看多使用autoauto可以减少冗余代码的混乱,使代码更易于阅读和修改。示例:


std::shared_ptr<pair_t> data = std::make_shared<pair_t>(...);您正在用几个字符写两次该类型。 std::make_shared<T>是一个众所周知的函数,总是返回std::shared_ptr<T>;当您指定要存储在其中的变量的类型时,您在强迫我花精力确保该变量的类型有意义(是否相同?它可能是基类指针吗?)。
std::future<T> future = data->first.get_future();这使您无法在函数类型上进行模板化,而不是在返回类型上进行模板化,从而使代码的通用性降低。
std::function<void()> work(std::move(m_work.front()));这里发生了很多事情。我认为auto work = std::move(m_work.front());的读起来要好得多。我不同意的唯一一点是关于某些命名注释。 m_workm_mutexm_workers看起来都像是好名字。但是,我肯定会在m_workm_workers上添加注释,指示仅当持有m_mutex的锁时才应对其进行读取或写入,并在m_signal中添加注释,仅当持有m_mutex的锁时应对其进行信号通知或等待。

评论


\ $ \ begingroup \ $
感谢您的评论!在线程尝试重新获取互斥锁时,在joinAll中锁定m_mutex将创建死锁。我为此需要另一个互斥锁用于m_threads。构造函数的文档已经过时,我很快就发现:3拥有numcpu + 1个工作线程来提高CPU资源利用率是一个普遍的经验法则。 +1是为了在IO上阻塞另一个线程的同时在CPU上执行工作。但不是+ n> 1以避免频繁的线程上下文切换。那并很好地避开了潜在的收益0;)
\ $ \ endgroup \ $
–艾米莉·L。
2014年8月19日在10:33



\ $ \ begingroup \ $
糟糕,您对僵局的看法是正确的;我已编辑删除了该建议,并提出了另一种建议。
\ $ \ endgroup \ $
– ruds
2014年8月19日在12:04



\ $ \ begingroup \ $
我建议的joinAll的替代方法是,只需在该类中添加第二个std :: mutex以保护m_workers。
\ $ \ endgroup \ $
– ruds
14年8月19日在13:23

\ $ \ begingroup \ $
std :: move使向量处于未指定的可破坏状态。更具体地说,不需要std :: move使其移动的容器上的(empty()返回true),这可能导致其他线程进入该函数时出现问题。更好的解决方案是使用worker.swap(m_workers)。无论如何,这种对joinAll的解决方案将函数的语义更改为不再阻塞,直到所有线程都被连接(只有第一个条目将被阻塞)。我相信第二个互斥锁是保留语义的唯一选择。
\ $ \ endgroup \ $
–艾米莉·L。
2014年8月19日13:40



\ $ \ begingroup \ $
@EmilyL。没错,这将断言,但是我不认为您的类的语义要求向量为空。
\ $ \ endgroup \ $
– ruds
14年8月19日在13:43