我想对代码进行一下回顾,重点是:
标准符合性/可移植性问题/最佳实践
线程安全/异常处理
API
性能问题(是的,我知道
std::mutex
在Windows上运行缓慢,但除此之外:)关于如何摆脱void
返回类型的特殊性的任何想法?我已在类声明中内联了所有方法,以使其在此处易于阅读。在实际的实现中,它们没有内联。很抱歉在某些地方出现换行失败,CR代码窗口比我想要的要窄。
#include <functional>
#include <future>
#include <deque>
#include <thread>
/// <summary> A typical thread worker queue that can execute arbitrary jobs.
/// </summary>
///
/// <remarks>
/// * Thread Safety : Full.
/// * Exception Safety: Strong. </remarks>
class WorkQueue{
public:
/// <summary> Constructors a new work queue object. </summary>
/// <param name="numWorkers"> (Optional) number of workers, less than 0 to
/// auto-detect (may fail on esoteric platforms). </param>
explicit WorkQueue(int numWorkers = -1){
if (numWorkers < 1){
numWorkers = std::thread::hardware_concurrency() + 1;
}
while (numWorkers--){
m_workers.emplace_back(std::thread(&WorkQueue::doWork, this));
}
}
/// <summary> Will abort all pending jobs and run any in-progress jobs to
/// completion upon destruction. </summary>
~WorkQueue(){
abort();
}
/// <summary> Stops work queue and finishes jobs currently being executed.
/// Queued jobs that have not begun execution will have their promises
/// broken. </summary>
void abort(){
m_exit = true;
m_finish_work = false;
m_signal.notify_all();
joinAll();
{
std::lock_guard<std::mutex> lg(m_mutex);
m_work.clear();
}
}
/// <summary> Stops new work from being submitted to this work queue.</summary>
void stop(){
m_exit = true;
m_finish_work = true;
m_signal.notify_all();
}
/// <summary> Wait for completion of all submitted work. No more work will
/// be allowed to be submitted. </summary>
void waitForCompletion(){
stop();
joinAll();
assert(m_work.empty());
}
/// <summary> Executes the given function asynchronously. </summary>
/// <exception cref="std::runtime_error"> Thrown if attempting to submit a job
/// to a work queue that is terminating. </exception>
/// <param name="function"> [in] The function to execute. </param>
/// <returns> A std::future<RETVAL> for the result that will be generated by
/// the function argument. Exceptions from the function will be
/// thrown by get() on the future.</returns>
template<typename RETVAL>
std::future<RETVAL> submit(std::function<RETVAL()>&& function){
if (m_exit){
throw std::runtime_error("Caught work submission to work queue that is desisting.");
}
// Workaround for lack of lambda move capture
typedef std::pair<std::promise<RETVAL>, std::function<RETVAL()>> pair_t;
std::shared_ptr<pair_t> data = std::make_shared<pair_t>(std::promise<RETVAL>(), std::move(function));
std::future<RETVAL> future = data->first.get_future();
{
std::lock_guard<std::mutex> lg(m_mutex);
m_work.emplace_back([data](){
try{
data->first.set_value(data->second());
}
catch (...){
data->first.set_exception(std::current_exception());
}
});
}
m_signal.notify_one();
return std::move(future);
}
template<>
std::future<void> submit(std::function<void()>&& function){
if (m_exit){
throw std::runtime_error("Caught work submission to work queue that is desisting.");
}
// Workaround for lack of lambda move capture
typedef std::pair<std::promise<void>, std::function<void()>> pair_t;
std::shared_ptr<pair_t> data = std::make_shared<pair_t>(std::promise<void>(), std::move(function));
std::future<void> future = data->first.get_future();
{
std::lock_guard<std::mutex> lg(m_mutex);
m_work.emplace_back([data](){
try{
data->second();
data->first.set_value();
}
catch (...){
data->first.set_exception(std::current_exception());
}
});
}
m_signal.notify_one();
return std::move(future);
}
private:
std::deque<std::function<void()>> m_work;
std::mutex m_mutex;
std::condition_variable m_signal;
std::atomic<bool> m_exit{ false };
std::atomic<bool> m_finish_work{ true };
std::vector<std::thread> m_workers;
void doWork(){
std::unique_lock<std::mutex> ul(m_mutex);
while (!m_exit || (m_finish_work && !m_work.empty())){
if (!m_work.empty()){
std::function<void()> work(std::move(m_work.front()));
m_work.pop_front();
ul.unlock();
work();
ul.lock();
}
else{
m_signal.wait(ul);
}
}
}
void joinAll(){
for (auto& thread : m_workers){
thread.join();
}
m_workers.clear();
}
void operator=(const WorkQueue&) = delete;
WorkQueue(const WorkQueue&) = delete;
};
示例用法:
int main(){
WorkQueue wq;
wq.submit<void>([](){std::cout << "foo" << std::endl; });
wq.submit<void>([](){std::cout << "bar" << std::endl; });
std::future<int> f0 = wq.submit<int>([](){return 4; });
std::future<int> f1 = wq.submit<int>([](){return 40; });
std::cout << f1.get() << std::endl;
wq.waitForCompletion();
std::cout << f.get() << std::endl;
return 0;
}
#1 楼
请注意,代码的显示方式应与使用它的方式相同,以避免虚假错误。按原样编译代码,使我对submit
的专业化产生了错误。命名告诉存储的内容
pair_t
可能会有所改善,尽管我还没有任何想法m_work
应该重命名为m_mutex
以表明其用途(或捆绑销售)它与m_work_mutex
一起使用,以致无法在不锁定m_work
的情况下访问m_work
) m_mutex
应该命名为m_signal
(或m_exit
,其逻辑应颠倒,因为(布尔)变量名的取反使推理更困难)m_accept_no_more_work
可以命名为accept_more_work
自动导出返回类型
为什么提取时将返回类型赋予
m_workers
会困扰用户?template <typename FunctionObject>
auto submit(FunctionObject &&function) -> std::future<decltype(function())>;
删除
m_worker_threads
中的代码重复您的两个
submit
函数具有相同的代码,表明您已开启template参数太粗糙。唯一不同的部分在submit
块中。显而易见的解决方案是将不同部分提取到自己的函数中。简而言之,我将使用模板功能专门化: >它们替换了try catch块中的呼叫。由于submit
与这些类型之间存在非平凡的依赖关系,因此不能用于自动模板参数推导:template <>
inline void WorkQueue::execute_and_set_data<void>(const DataPointer<void> &data) {
data->second();
data->first.set_value();
}
template <typename ReturnType>
void WorkQueue::execute_and_set_data(const DataPointer<ReturnType> &data) {
data->first.set_value(data->second());
}
但是,我建议用一个模板化的
try{}
替换ReturnType
,该模板具有比PromiseFunctionPair
和struct
更好的成员名称,这将允许自动模板参数推导。遗漏包括
您正在使用不带
first
的second
您正在使用不带
std::vector
的#include <vector>
错别字
assert
应该应该是#include <cassert>
吗?除了上述缺陷,我非常喜欢阅读您的代码:)
#2 楼
首先,我要说这是一个编写良好的文档,并提供了很好的文档。我还认为,没人能提供很好的建议,因此我将主要基于此。我只有一个正确的评论:您在
joinAll
中有问题。如果从多个线程调用waitForCompletion
和/或abort
(我当然可以想象一个调用waitForCompletion
的线程,另一个调用abort
的线程),则您在joinAll
中存在竞争;当另一个调用m_workers.clear()
时,一个线程可能处于迭代中。请尝试以下操作:void joinAll(){
std::vector<std::thread> workers;
{
std::lock_guard<std::mutex> lg(m_mutex);
workers = std::move(m_workers);
}
for (auto& thread : workers) {
thread.join();
}
}
(这只是部分解决方案;您必须添加条件变量或类似变量,以确保将来对
joinAll
的调用不会t会在此线程加入所有线程之前返回。)您记录
WorkQueue::WorkQueue
,说numWorkers < 0
会导致自动检测,但是只要numWorkers < 1
就会自动检测。也就是说,您的文档和实现在numWorkers == 0
时的行为上有所不同。为什么自动检测时创建的线程比硬件支持的并发线程数多?这是否只是为了避免对
0
的潜在收益进行特殊处理?我建议为
m_workers
保留空间:m_workers.reserve(numWorkers);
。在这种情况下,它很小,但却是一个好习惯。初始化
m_workers
时,请使用m_workers.emplace_back(std::thread(...))
。我可以使用m_workers.emplace_back(...)
或m_workers.push_back(std::thread(...))
(我更喜欢第一个)。 emplace_back
采用元素类型的构造函数参数,并且push_back
具有右值引用重载。在
stop
中,我避免设置m_finish_work
;如果abort
已经被调用,我不认为您要覆盖该设置,如果尚未调用,则m_finish_work
已经为true。我当然同意没人建议(a)提取
execute_and_set_data
和(b)提交由function
类型参数化的模板,而不是将其强制为std::function<T()>
的建议。我也同意您应该使用命名字段引入struct
而不是使用std::pair
。submit
应该返回future
而不是std::move(future)
。从函数返回局部变量已经移动了它,并且显式调用std::move
会禁止NRVO。或者,返回data->first.get_future()
;您永远不会在其定义和返回行之外引用future
(也许这是您将promise
移开的代码的早期版本中的工件?)。我想看看多使用
auto
。 auto
可以减少冗余代码的混乱,使代码更易于阅读和修改。示例:std::shared_ptr<pair_t> data = std::make_shared<pair_t>(...);
您正在用几个字符写两次该类型。 std::make_shared<T>
是一个众所周知的函数,总是返回std::shared_ptr<T>
;当您指定要存储在其中的变量的类型时,您在强迫我花精力确保该变量的类型有意义(是否相同?它可能是基类指针吗?)。std::future<T> future = data->first.get_future();
这使您无法在函数类型上进行模板化,而不是在返回类型上进行模板化,从而使代码的通用性降低。std::function<void()> work(std::move(m_work.front()));
这里发生了很多事情。我认为auto work = std::move(m_work.front());
的读起来要好得多。我不同意的唯一一点是关于某些命名注释。 m_work
,m_mutex
和m_workers
看起来都像是好名字。但是,我肯定会在m_work
和m_workers
上添加注释,指示仅当持有m_mutex
的锁时才应对其进行读取或写入,并在m_signal
中添加注释,仅当持有m_mutex
的锁时应对其进行信号通知或等待。评论
\ $ \ begingroup \ $
感谢您的评论!在线程尝试重新获取互斥锁时,在joinAll中锁定m_mutex将创建死锁。我为此需要另一个互斥锁用于m_threads。构造函数的文档已经过时,我很快就发现:3拥有numcpu + 1个工作线程来提高CPU资源利用率是一个普遍的经验法则。 +1是为了在IO上阻塞另一个线程的同时在CPU上执行工作。但不是+ n> 1以避免频繁的线程上下文切换。那并很好地避开了潜在的收益0;)
\ $ \ endgroup \ $
–艾米莉·L。
2014年8月19日在10:33
\ $ \ begingroup \ $
糟糕,您对僵局的看法是正确的;我已编辑删除了该建议,并提出了另一种建议。
\ $ \ endgroup \ $
– ruds
2014年8月19日在12:04
\ $ \ begingroup \ $
我建议的joinAll的替代方法是,只需在该类中添加第二个std :: mutex以保护m_workers。
\ $ \ endgroup \ $
– ruds
14年8月19日在13:23
\ $ \ begingroup \ $
std :: move使向量处于未指定的可破坏状态。更具体地说,不需要std :: move使其移动的容器上的(empty()返回true),这可能导致其他线程进入该函数时出现问题。更好的解决方案是使用worker.swap(m_workers)。无论如何,这种对joinAll的解决方案将函数的语义更改为不再阻塞,直到所有线程都被连接(只有第一个条目将被阻塞)。我相信第二个互斥锁是保留语义的唯一选择。
\ $ \ endgroup \ $
–艾米莉·L。
2014年8月19日13:40
\ $ \ begingroup \ $
@EmilyL。没错,这将断言,但是我不认为您的类的语义要求向量为空。
\ $ \ endgroup \ $
– ruds
14年8月19日在13:43
评论
\ $ \ begingroup \ $
很棒的评论!谢谢!顺便问一下,您使用了什么编译器?提交方法的编译与VS2013相同。至于包含,我的默认PCH必须包含包含矢量,并且它在测试程序中。 f.get()是一个错字,是的,我在事实之后补充了一个例子。我以为我不会弄乱这么简单的加法...数字大声笑。我将等待一个更长的时间来接受答案,看看是否还有更多评论。
\ $ \ endgroup \ $
–艾米莉·L。
2014年8月19日在9:45
\ $ \ begingroup \ $
我同时使用了gcc 4.9和clang ++ 3.6,后者给出了错误错误:类范围内'submit'的显式专门化请参见此SO答案,了解标准在此问题上的立场。关于PCH评论:我已经多次遇到这个问题。当您仅在包含必需标头的源文件中使用标头时,也会发生此问题。对于标头有一个虚拟源文件会有所帮助,该文件不包含任何其他内容(并且没有PCH)以检测丢失的包含文件。
\ $ \ endgroup \ $
–没人远离SE
2014年8月19日在10:55