ThreadPool
A simple C++11 Thread Pool implementation.
https://github.com/progschj/ThreadPool
class ThreadPool {
public:
ThreadPool(size_t);
template<class F, class... Args>
auto enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>;
~ThreadPool();
private:
// need to keep track of threads so we can join them
std::vector< std::thread > workers;
// the task queue
std::queue< std::function<void()> > tasks;
// synchronization
std::mutex queue_mutex;
std::condition_variable condition;
bool stop;
};
F&& 在这里的作用是进行完美转发(perfect forwarding)。
完美转发的目的是保持参数调用时的左值或右值的属性,并避免不必要的拷贝。
在 enqueue 方法中,参数 f 可以是左值或右值,例如:
void func(int, double);
auto f1 = func; // f1是左值
enqueue(f1, 1, 2.3);
enqueue(func, 1, 2.3); // func是右值
如果定义为 enqueue(F f),那么即使调用时传递的是右值,f也会强制转成左值引用,导致一次额外的拷贝操作。
而 F&& 可以避免这种强制转换:
- 当传入左值时,F&& 会转成 F&,保持左值属性
- 当传入右值时,F&& 保持右值属性
这样,通过 F&& 完美转发,既可以传递左值,也可以传递右值,而且不会发生额外拷贝,提高了效率。
所以 F&& 在这里就是用于实现完美转发,根据调用传入的参数类型选择最匹配的引用形式,避免意外拷贝,保持参数的左值或右值属性不变。
Args&& … 在这里表示的是对参数 args 进行完美转发。
其中:
Args&& 表示 args 是右值引用,既可以绑定到左值,也可以绑定到右值。
… 表示 Args 是参数包,可以接受 0 个或多个参数。
这样,Args&&… 允许传入左值或右值参数,并且不会发生额外的引用转换或拷贝:
- 当传入左值时,Args&& 会隐式转为 Args&,作为左值引用使用
- 当传入右值时,Args&& 保持右值引用
例如:
int x = 1;
double y = 2.3;
enqueue(func, x, y); // x, y是左值
enqueue(func, 1, 2.3); // 1, 2.3是右值
通过 Args&&… ,上述调用都可以正确转发,既可以处理左右值混合的情况,也可以处理全部左值或全部右值的情况。
所以 Args&&… 的作用就是实现对参数的完美转发,根据参数的实际类型进行引用折叠,避免额外的拷贝或转换。保持了参数调用时的左值/右值属性不变。
在 enqueue 函数的返回类型中:
auto enqueue(...) -> std::future<typename std::result_of<F(Args...)>::type>
-> 表示这是一个trailing return type语法,也就是返回类型后置语法。
trailing return type要求在函数参数声明之后再写返回类型。与普通的返回类型声明不同的是,这种语法可以在参数声明后才确定返回类型。
这里之所以需要后置返回类型语法,是因为返回类型依赖于参数F和Args的类型,需要先声明参数才能确定返回类型。
std::result_of::type 就是获取调用f(args…)的结果类型。
然后将其包装成std::future返回。
所以-> 在这里的作用就是指示这是一个后置返回类型语法,而返回类型需要依赖函数参数才能确定。
在这个返回类型声明中:
auto enqueue(...) -> std::future<typename std::result_of<F(Args...)>::type>
std::result_of::type 的作用是获取调用函数F时的返回类型。
std::result_of是一个模板类,用于通过参数推导结果类型。其中:
- F(Args…) 表示调用函数F并传入参数Args
- std::result_of 将推导出调用的结果类型
- ::type 取出结果类型
整个表示调用F(Args…)后返回的类型。
之所以需要推导返回类型,是因为函数F的具体类型在enqueue时是不确定的,需要通过参数推导。
比如:
int func(double);
enqueue(func, 2.3);
这里func的返回类型是int,通过 std::result_of 可以推导出调用的返回类型是int,然后封装到future中返回。
如果不使用std::result_of,则无法推导出具体的返回类型,因为函数F的类型不确定。
所以 std::result_of 在这里是用来根据参数推导出不确定函数调用的返回类型的,从而可以动态地返回future。
在这个返回类型中:
auto enqueue(...) -> std::future<typename std::result_of<F(Args...)>::type>
std::future 表示返回一个异步操作的未来结果的对象。
在线程池的enqueue操作中,把任务函数F打包进线程池执行,会得到一个未来结果。所以返回一个std::future对象,然后主线程可以在需要结果的时候调用future的get方法获取结果。
std::future的主要作用有:
- 可以获取异步操作的返回值 – 通过future的get()方法
- 提供同步机制 – 通过wait()等可以等待future Ready
- 当异步操作完成时,可以通过回调函数获取结果
所以在线程池中,返回std::future可以让调用者在需要结果时获取函数调用的返回值,并可以同步等待或通过回调方式获取结果。
未来对象也表示线程池会负责执行这个异步任务,调用者只需要关心结果即可。
总之,返回std::future实现了调用者与异步线程池任务执行之间的同步和数据传递。调用者可以通过future协调异步操作。
// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads)
: stop(false)
{
for(size_t i = 0;i<threads;++i)
workers.emplace_back(
[this]
{
for(;;)
{
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(this->queue_mutex);
this->condition.wait(lock,
[this]{ return this->stop || !this->tasks.empty(); });
if(this->stop && this->tasks.empty())
return;
task = std::move(this->tasks.front());
this->tasks.pop();
}
task();
}
}
);
}
这段代码是实现一个线程池的构造函数。
主要逻辑是:
- 在构造函数中,根据传入的线程数量threads,启动相应数量的线程工作者(worker)。
- 每个工作者是一个std::thread,在一个无限循环中等待任务。
- 使用condition变量在队列为空时等待。
- 当有新任务时,获取队列头部的任务,并执行。
- 当设置了stop标志并且队列为空时,工作者线程退出。
具体分析:
- stop是一个原子布尔变量,用于控制线程池的停止。
- queue_mutex是保护任务队列的互斥锁。
- condition是一个条件变量,用于线程等待。
- tasks是任务队列,用std::queue实现。
- 工作线程通过unique_lock获取队列的锁,在任务队列为空时wait,有新任务时执行。
- 当线程池要停止时,设置stop=true,并通知所有线程(condition.notify_all())
- 每个线程检查stop==true且任务队列为空时退出。
这样通过条件变量的等待通知机制,实现了线程池的基本调度功能。
总体上,利用C++11的多线程和同步特性,实现了一个基础的线程池。
在线程池构造函数中,有以下代码:
workers.emplace_back(
[this]
{ // ... } );
这里的[this]
是一个lambda表达式的捕获列表,它将ThreadPool对象的this指针捕获到了lambda闭包中。
这样做的目的是在lambda函数体内可以访问ThreadPool对象的成员变量和成员函数。
具体来看,对这个lambda的作用有:
- 可以访问ThreadPool的成员变量queue_mutex、condition、stop、tasks等,用于实现线程池的逻辑。
- 通过this指针调用ThreadPool的其他成员函数,如加入新任务、设置停止等。
- 手动对this进行生命周期管理,因为线程函数需要在ThreadPool对象生命周期内运作。
所以这个第一个this的作用是捕获ThreadPool对象指针,使线程函数可以访问该对象的公共接口,与对象本身进行交互。
如果不捕获this,则lambda函数内部无法访问ThreadPool的成员,也无法与之交互,无法实现线程池的功能。
所以它起到了连接线程函数与线程池对象的作用,是实现这个线程池的关键。
在线程池构造函数中的线程工作者(worker)循环中:
for(;;)
{
// 获取任务,执行任务
}
使用了一个无限循环for(;;)
。
这是为了让每个工作线程持续不断地运行,一直接收取任务并执行。
之所以需要一个无限循环,是因为线程池要求线程工作者永远在后台运行,等待接收新的任务。
如果没有无限循环,那么执行完当前任务后线程就会退出,无法继续处理后续任务。
具体作用是:
- 在循环内等待条件变量,当有新任务时获取任务
- 循环体内执行任务
- 然后再次循环等待新任务
这样就可以重复使用同一个线程处理无限多个任务,直到线程池被关闭。
所以这个无限循环的作用是让线程工作者持续运行,反复处理任务,实现线程池的重用与调度功能。
无限循环也避免了重复创建/销毁线程的性能开销。
所以它是线程池实现中的一种很常见的技术。
在这个线程池的实现中,this->condition 表示一个 std::condition_variable 类型的条件变量,它的作用是线程间的同步和通信。
具体的使用方式是:
- 在获取互斥锁后,使用 condition.wait() 让线程进入等待状态。
- 当需要通知时,在其他线程中调用 condition.notify_one()/notify_all() 唤醒等待的线程。
- 被唤醒的线程从 wait() 返回后就可以重新锁定互斥锁,然后继续执行。
示例代码:
// 等待线程
std::unique_lock<std::mutex> lock(mutex);
condition.wait(lock, []{return ready;});
// 唤醒线程
{
std::lock_guard<std::mutex> lock(mutex);
ready = true;
}
condition.notify_one();
在这个线程池中,具体使用方式是:
- worker 线程等待在 condition.wait() 上,直到任务队列不空
- 当有新任务加入时,调用 condition.notify_one() 唤醒一个 worker
- 被唤醒的 worker 会从 wait() 返回,并执行任务
所以 this->condition 作为条件变量,实现了线程池的任务调度逻辑,是线程池实现的关键组件。
// add new work item to the pool
template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args)
-> std::future<typename std::result_of<F(Args...)>::type>
{
using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared< std::packaged_task<return_type()> >(
std::bind(std::forward<F>(f), std::forward<Args>(args)...)
);
std::future<return_type> res = task->get_future();
{
std::unique_lock<std::mutex> lock(queue_mutex);
// don't allow enqueueing after stopping the pool
if(stop)
throw std::runtime_error("enqueue on stopped ThreadPool");
tasks.emplace([task](){ (*task)(); });
}
condition.notify_one();
return res;
}
std::packaged_task的作用是把一个可调用对象(如函数、lambda表达式)包装成一个任务对象。
std::packaged_task出现在C++11标准中,它提供了以下主要功能:
- 打包(包装)一个可调用对象,生成一个任务对象。
- 从任务对象中获取一个std::future对象,用于获取任务执行结果。
- 移动任务对象以传递所有权。
- 支持函数签名的类型推导。
使用std::packaged_task的主要意义:
- 可以把任意可调用对象封装为任务,在多线程环境下通过线程执行任务。
- 通过future可以在任务执行结束后获取执行结果。
- 任务对象可以传递所有权和移交执行责任。
- 用单一接口封装了异步任务的执行。
综上,std::packaged_task为异步任务的执行、传递、获取结果提供了统一的接口,很好地支持了C++11中的多线程编程。它让任务对象可以像一般对象那样移动和传递所有权,方便了异步编程模式的实现。
[task](){ (*task)(); }是一个lambda表达式,其作用是执行封装在task中的可调用对象。
具体分析:
- task是一个std::shared_ptr类型,包含了用户通过enqueue传入的可调用对象。
- emplace会将这个lambda表达式构造为一个函数对象,并放入任务队列tasks中。
- 当线程开始执行任务时,会调用该lambda表达式。
- lambda表达式中调用(*task)(),即执行task所封装的用户可调用对象。
- 这样就完成了对用户传入函数的异步执行。
所以[task](){ (*task)(); }的作用是隔离出task对象,在执行线程中调用它执行封装的用户函数。
通过这种方式,enqueue函数向任务队列存入的都是执行task的lambda表达式,而task本身作为shared_ptr被移动到了lambda内部,实现了任务对象的传递。线程执行lambda表达式时再激活task进行真正的执行。
这实现了任务对象的编程模式,使异步执行的框架更为清晰。
发表回复