ThreadPoolprogschj代码解读

ThreadPool

A simple C++11 Thread Pool implementation.

https://github.com/progschj/ThreadPool

class ThreadPool {
public:
    ThreadPool(size_t);
    template<class F, class... Args>
    auto enqueue(F&& f, Args&&... args) 
        -> std::future<typename std::result_of<F(Args...)>::type>;
    ~ThreadPool();
private:
    // need to keep track of threads so we can join them
    std::vector< std::thread > workers;
    // the task queue
    std::queue< std::function<void()> > tasks;
    
    // synchronization
    std::mutex queue_mutex;
    std::condition_variable condition;
    bool stop;
};

F&& 在这里的作用是进行完美转发(perfect forwarding)。

完美转发的目的是保持参数调用时的左值或右值的属性,并避免不必要的拷贝。

在 enqueue 方法中,参数 f 可以是左值或右值,例如:

void func(int, double); 

auto f1 = func; // f1是左值
enqueue(f1, 1, 2.3); 

enqueue(func, 1, 2.3); // func是右值

如果定义为 enqueue(F f),那么即使调用时传递的是右值,f也会强制转成左值引用,导致一次额外的拷贝操作。

而 F&& 可以避免这种强制转换:

  • 当传入左值时,F&& 会转成 F&,保持左值属性
  • 当传入右值时,F&& 保持右值属性

这样,通过 F&& 完美转发,既可以传递左值,也可以传递右值,而且不会发生额外拷贝,提高了效率。

所以 F&& 在这里就是用于实现完美转发,根据调用传入的参数类型选择最匹配的引用形式,避免意外拷贝,保持参数的左值或右值属性不变。


Args&& … 在这里表示的是对参数 args 进行完美转发。

其中:

Args&& 表示 args 是右值引用,既可以绑定到左值,也可以绑定到右值。

… 表示 Args 是参数包,可以接受 0 个或多个参数。

这样,Args&&… 允许传入左值或右值参数,并且不会发生额外的引用转换或拷贝:

  • 当传入左值时,Args&& 会隐式转为 Args&,作为左值引用使用
  • 当传入右值时,Args&& 保持右值引用

例如:

int x = 1;
double y = 2.3; 

enqueue(func, x, y); // x, y是左值
enqueue(func, 1, 2.3); // 1, 2.3是右值

通过 Args&&… ,上述调用都可以正确转发,既可以处理左右值混合的情况,也可以处理全部左值或全部右值的情况。

所以 Args&&… 的作用就是实现对参数的完美转发,根据参数的实际类型进行引用折叠,避免额外的拷贝或转换。保持了参数调用时的左值/右值属性不变。


在 enqueue 函数的返回类型中:

auto enqueue(...) -> std::future<typename std::result_of<F(Args...)>::type>

-> 表示这是一个trailing return type语法,也就是返回类型后置语法。

trailing return type要求在函数参数声明之后再写返回类型。与普通的返回类型声明不同的是,这种语法可以在参数声明后才确定返回类型。

这里之所以需要后置返回类型语法,是因为返回类型依赖于参数F和Args的类型,需要先声明参数才能确定返回类型。

std::result_of::type 就是获取调用f(args…)的结果类型。

然后将其包装成std::future返回。

所以-> 在这里的作用就是指示这是一个后置返回类型语法,而返回类型需要依赖函数参数才能确定。


在这个返回类型声明中:

auto enqueue(...) -> std::future<typename std::result_of<F(Args...)>::type>

std::result_of::type 的作用是获取调用函数F时的返回类型。

std::result_of是一个模板类,用于通过参数推导结果类型。其中:

  • F(Args…) 表示调用函数F并传入参数Args
  • std::result_of 将推导出调用的结果类型
  • ::type 取出结果类型

整个表示调用F(Args…)后返回的类型。

之所以需要推导返回类型,是因为函数F的具体类型在enqueue时是不确定的,需要通过参数推导。

比如:

int func(double);

enqueue(func, 2.3);

这里func的返回类型是int,通过 std::result_of 可以推导出调用的返回类型是int,然后封装到future中返回。

如果不使用std::result_of,则无法推导出具体的返回类型,因为函数F的类型不确定。

所以 std::result_of 在这里是用来根据参数推导出不确定函数调用的返回类型的,从而可以动态地返回future。


在这个返回类型中:

auto enqueue(...) -> std::future<typename std::result_of<F(Args...)>::type> 

std::future 表示返回一个异步操作的未来结果的对象。

在线程池的enqueue操作中,把任务函数F打包进线程池执行,会得到一个未来结果。所以返回一个std::future对象,然后主线程可以在需要结果的时候调用future的get方法获取结果。

std::future的主要作用有:

  1. 可以获取异步操作的返回值 – 通过future的get()方法
  2. 提供同步机制 – 通过wait()等可以等待future Ready
  3. 当异步操作完成时,可以通过回调函数获取结果

所以在线程池中,返回std::future可以让调用者在需要结果时获取函数调用的返回值,并可以同步等待或通过回调方式获取结果。

未来对象也表示线程池会负责执行这个异步任务,调用者只需要关心结果即可。

总之,返回std::future实现了调用者与异步线程池任务执行之间的同步和数据传递。调用者可以通过future协调异步操作。


// the constructor just launches some amount of workers
inline ThreadPool::ThreadPool(size_t threads)
    :   stop(false)
{
    for(size_t i = 0;i<threads;++i)
        workers.emplace_back(
            [this]
            {
                for(;;)
                {
                    std::function<void()> task;
                    {
                        std::unique_lock<std::mutex> lock(this->queue_mutex);
                        this->condition.wait(lock,
                            [this]{ return this->stop || !this->tasks.empty(); });
                        if(this->stop && this->tasks.empty())
                            return;
                        task = std::move(this->tasks.front());
                        this->tasks.pop();
                    }

                    task();
                }
            }
        );
}

这段代码是实现一个线程池的构造函数。

主要逻辑是:

  • 在构造函数中,根据传入的线程数量threads,启动相应数量的线程工作者(worker)。
  • 每个工作者是一个std::thread,在一个无限循环中等待任务。
  • 使用condition变量在队列为空时等待。
  • 当有新任务时,获取队列头部的任务,并执行。
  • 当设置了stop标志并且队列为空时,工作者线程退出。

具体分析:

  • stop是一个原子布尔变量,用于控制线程池的停止。
  • queue_mutex是保护任务队列的互斥锁。
  • condition是一个条件变量,用于线程等待。
  • tasks是任务队列,用std::queue实现。
  • 工作线程通过unique_lock获取队列的锁,在任务队列为空时wait,有新任务时执行。
  • 当线程池要停止时,设置stop=true,并通知所有线程(condition.notify_all())
  • 每个线程检查stop==true且任务队列为空时退出。

这样通过条件变量的等待通知机制,实现了线程池的基本调度功能。

总体上,利用C++11的多线程和同步特性,实现了一个基础的线程池。


在线程池构造函数中,有以下代码:

workers.emplace_back(
    [this]
{ // ... } );
    

这里的[this]是一个lambda表达式的捕获列表,它将ThreadPool对象的this指针捕获到了lambda闭包中。

这样做的目的是在lambda函数体内可以访问ThreadPool对象的成员变量和成员函数。

具体来看,对这个lambda的作用有:

  1. 可以访问ThreadPool的成员变量queue_mutex、condition、stop、tasks等,用于实现线程池的逻辑。
  2. 通过this指针调用ThreadPool的其他成员函数,如加入新任务、设置停止等。
  3. 手动对this进行生命周期管理,因为线程函数需要在ThreadPool对象生命周期内运作。

所以这个第一个this的作用是捕获ThreadPool对象指针,使线程函数可以访问该对象的公共接口,与对象本身进行交互。

如果不捕获this,则lambda函数内部无法访问ThreadPool的成员,也无法与之交互,无法实现线程池的功能。

所以它起到了连接线程函数与线程池对象的作用,是实现这个线程池的关键。


在线程池构造函数中的线程工作者(worker)循环中:

for(;;) 
{
    // 获取任务,执行任务
}

使用了一个无限循环for(;;)

这是为了让每个工作线程持续不断地运行,一直接收取任务并执行。

之所以需要一个无限循环,是因为线程池要求线程工作者永远在后台运行,等待接收新的任务。

如果没有无限循环,那么执行完当前任务后线程就会退出,无法继续处理后续任务。

具体作用是:

  • 在循环内等待条件变量,当有新任务时获取任务
  • 循环体内执行任务
  • 然后再次循环等待新任务

这样就可以重复使用同一个线程处理无限多个任务,直到线程池被关闭。

所以这个无限循环的作用是让线程工作者持续运行,反复处理任务,实现线程池的重用与调度功能。

无限循环也避免了重复创建/销毁线程的性能开销。

所以它是线程池实现中的一种很常见的技术。


在这个线程池的实现中,this->condition 表示一个 std::condition_variable 类型的条件变量,它的作用是线程间的同步和通信。

具体的使用方式是:

  1. 在获取互斥锁后,使用 condition.wait() 让线程进入等待状态。
  2. 当需要通知时,在其他线程中调用 condition.notify_one()/notify_all() 唤醒等待的线程。
  3. 被唤醒的线程从 wait() 返回后就可以重新锁定互斥锁,然后继续执行。

示例代码:

// 等待线程
std::unique_lock<std::mutex> lock(mutex);
condition.wait(lock, []{return ready;}); 

// 唤醒线程
{
  std::lock_guard<std::mutex> lock(mutex);
  ready = true;
}
condition.notify_one();

在这个线程池中,具体使用方式是:

  • worker 线程等待在 condition.wait() 上,直到任务队列不空
  • 当有新任务加入时,调用 condition.notify_one() 唤醒一个 worker
  • 被唤醒的 worker 会从 wait() 返回,并执行任务

所以 this->condition 作为条件变量,实现了线程池的任务调度逻辑,是线程池实现的关键组件。


// add new work item to the pool
template<class F, class... Args>
auto ThreadPool::enqueue(F&& f, Args&&... args) 
    -> std::future<typename std::result_of<F(Args...)>::type>
{
    using return_type = typename std::result_of<F(Args...)>::type;

    auto task = std::make_shared< std::packaged_task<return_type()> >(
            std::bind(std::forward<F>(f), std::forward<Args>(args)...)
        );
        
    std::future<return_type> res = task->get_future();
    {
        std::unique_lock<std::mutex> lock(queue_mutex);

        // don't allow enqueueing after stopping the pool
        if(stop)
            throw std::runtime_error("enqueue on stopped ThreadPool");

        tasks.emplace([task](){ (*task)(); });
    }
    condition.notify_one();
    return res;
}

std::packaged_task的作用是把一个可调用对象(如函数、lambda表达式)包装成一个任务对象。

std::packaged_task出现在C++11标准中,它提供了以下主要功能:

  1. 打包(包装)一个可调用对象,生成一个任务对象。
  2. 从任务对象中获取一个std::future对象,用于获取任务执行结果。
  3. 移动任务对象以传递所有权。
  4. 支持函数签名的类型推导。

使用std::packaged_task的主要意义:

  1. 可以把任意可调用对象封装为任务,在多线程环境下通过线程执行任务。
  2. 通过future可以在任务执行结束后获取执行结果。
  3. 任务对象可以传递所有权和移交执行责任。
  4. 用单一接口封装了异步任务的执行。

综上,std::packaged_task为异步任务的执行、传递、获取结果提供了统一的接口,很好地支持了C++11中的多线程编程。它让任务对象可以像一般对象那样移动和传递所有权,方便了异步编程模式的实现。


[task](){ (*task)(); }是一个lambda表达式,其作用是执行封装在task中的可调用对象。

具体分析:

  1. task是一个std::shared_ptr类型,包含了用户通过enqueue传入的可调用对象。
  2. emplace会将这个lambda表达式构造为一个函数对象,并放入任务队列tasks中。
  3. 当线程开始执行任务时,会调用该lambda表达式。
  4. lambda表达式中调用(*task)(),即执行task所封装的用户可调用对象。
  5. 这样就完成了对用户传入函数的异步执行。

所以[task](){ (*task)(); }的作用是隔离出task对象,在执行线程中调用它执行封装的用户函数。

通过这种方式,enqueue函数向任务队列存入的都是执行task的lambda表达式,而task本身作为shared_ptr被移动到了lambda内部,实现了任务对象的传递。线程执行lambda表达式时再激活task进行真正的执行。

这实现了任务对象的编程模式,使异步执行的框架更为清晰。


已发布

分类

,

来自

标签:

评论

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注