在看这篇文章之前,请先确保大致了解过线程池的基本实现及其原理。如果不太了解,可以先去阅读一下这篇文章: (pthread线程池)

C++11 简单线程池

 下面的线程池是基于C++11的线程池。编写的思路跟前文的pthread实现的线程池基本一致。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
#include <thread>  
#include <vector>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <atomic>

template <class T>
class ThreadPool
{
public:
ThreadPool(size_t thread_nums = std::thread::hardware_concurrency())
:_thread_nums(thread_nums)
, _stop(false)
{
for (size_t i = 0; i < _thread_nums; ++i)
{
_vt.emplace_back(std::thread(&ThreadPool::LoopWork, this));//将this指针也传过去
}
}

//禁用拷贝构造和operator=
ThreadPool(const ThreadPool &) = delete;
ThreadPool& operator=(const ThreadPool &) = delete;

private:
//线程的执行函数
void LoopWork()
{
std::unique_lock<std::mutex> ul(_mtx);
for (;;)
{
while (_taskQueue.empty() && !_stop)
{
_isEmpty.wait(ul);
}

//线程从wait中出来有2种情况: 1.有任务了 2.线程池stop为true
if (_taskQueue.empty()) {
ul.unlock(); //退出前要先解锁
break;
}
else {
T* task = std::move(_taskQueue.front());
_taskQueue.pop();
ul.unlock();
(*task)(); //任务类需要重载operator()()
ul.lock();
}
}
}

public:
void PushTask(T& task)
{
//通过{}控制lock_guard的作用域和生命周期
{
std::lock_guard<std::mutex> lg(_mtx);
_taskQueue.push(&task); //任务队列是临界资源(其他地方会修改)
}
_isEmpty.notify_one(); //条件变量的通知并不会因为多线程而影响结果(因此可以不加锁)
}


~ThreadPool()
{
_stop = true;
_isEmpty.notify_all();
for (size_t i = 0; i < _thread_nums; ++i)
{
if (_vt[i].joinable()) {
_vt[i].join();
}
}
}

private:
size_t _thread_nums;
std::vector<std::thread> _vt;
std::queue<T*> _taskQueue;
std::mutex _mtx;
std::condition_variable _isEmpty;
std::atomic<bool> _stop;
};

struct Task {
Task(int x, int y)
:_x(x),
_y(y)
{}

void operator()() {
std::cout << _x << " + " << _y << " = " << _x + _y << std::endl;
}

int _x;
int _y;
};

int main()
{
std::shared_ptr<ThreadPool<Task>> tp(new ThreadPool<Task>());
Task t(1, 2);
tp->PushTask(t);
}

输出结果:

1 + 2 = 3

&emsp;

&emsp;

优化1 - 支持任意类型任务

&emsp;在前文的pthread线程池以及根据pthread改写的C++11线程池都是基于模板实现的。因为线程池需要能够接收不同类型的任务。但是将整个ThreadPool类设为模板其实不是最优解,因为当ThreadPool需要处理其它类型的任务时,还需要再实例化出一个新的ThreadPool类。它并没有实现一个线程池实例接收多种类型任务的功能。它实现的是多个线程池实例接收多种类型任务,每个线程池本质是只处理一种任务。

&emsp;为了解决上述问题,我们不应该将整个ThreadPool都设为模板类,我们的思路是让任务队列能够存放不同类型的任务。这里使用到了C++11中的function包装器,让任务队列里存放function<void()>类型的任务。而在PushTask()方法里,我们需要让该函数能够接收任意类型的任务,因此需要单独将PushTask()方法设为模板函数。

&emsp;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
#include <thread>  
#include <vector>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <functional>
#include <atomic>

class ThreadPool
{
public:
ThreadPool(size_t thread_nums = std::thread::hardware_concurrency())
:_thread_nums(thread_nums)
, _stop(false)
{
for (size_t i = 0; i < _thread_nums; ++i)
{
_vt.emplace_back(std::thread(&ThreadPool::LoopWork, this));//将this指针也传过去
}
}

//禁用拷贝构造和operator=
ThreadPool(const ThreadPool &) = delete;
ThreadPool& operator=(const ThreadPool &) = delete;

private:
//线程的执行函数
void LoopWork()
{
std::unique_lock<std::mutex> ul(_mtx);
for (;;)
{
while (_taskQueue.empty() && !_stop)
{
_isEmpty.wait(ul);
}

//线程从wait中出来有2种情况: 1.有任务了 2.线程池stop为true
if (_taskQueue.empty()) {
ul.unlock(); //退出前要先解锁
break;
}
else {
auto task = std::move(_taskQueue.front());
_taskQueue.pop();
ul.unlock();
task();
ul.lock();
}
}
}

public:
template<typename F>
void PushTask(F&& task)
{
{
std::lock_guard<std::mutex> lg(_mtx);
_taskQueue.push(std::forward<F>(task)); //任务队列是临界资源(其他地方会修改)
}
_isEmpty.notify_one(); //条件变量的通知并不会因为多线程而影响结果(因此可以不加锁)
}


~ThreadPool()
{
_stop = true;
_isEmpty.notify_all();
for (size_t i = 0; i < _thread_nums; ++i)
{
if (_vt[i].joinable()) {
_vt[i].join();
}
}
}

private:
size_t _thread_nums;
std::vector<std::thread> _vt;
std::queue<std::function<void()>> _taskQueue;//每个任务必须保证是: void返回值、无参
std::mutex _mtx;
std::condition_variable _isEmpty;
std::atomic<bool> _stop;
};

&emsp;

• 优化1 - 测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
std::mutex gb_mtx;
void Add(int x, int y)
{
std::lock_guard<std::mutex> lg(gb_mtx); //为了保证输出结果不会打印错乱
std::cout << x << " + " << y << " = " << x + y << std::endl;
}

int main()
{
std::shared_ptr<ThreadPool> tp(new ThreadPool());
for (int i = 0; i < 10; ++i)
{
auto f = bind(Add, i, i + 1); //我们需要手动绑定一下函数参数
tp->PushTask(f);
}
return 0;
}

输出结果:

0 + 1 = 1
1 + 2 = 3
2 + 3 = 5
3 + 4 = 7
4 + 5 = 9
5 + 6 = 11
6 + 7 = 13
7 + 8 = 15
8 + 9 = 17
9 + 10 = 19

&emsp;

&emsp;

优化2 - 支持可变参数

&emsp;前一个版本的线程池是支持任意类型的任务的,但是我们在使用线程池的时候必须要手动bind一个函数对象,然后再传过去(这是因为任务队列要求任务必须是void返回值且无参数)。为了优化这个问题,我们可以使用可变参数模板+bind解决。下面给出PushTask()方法的优化: (除了PushTask, 其它地方没有任何修改)

1
2
3
4
5
6
7
8
9
10
11
template<typename F, typename... Args>
void PushTask(F&& task, Args&&... args)
{
//将可变参数包装起来, 包装后func的类型满足了"void返回值+无参"
auto func = std::bind(std::forward<F>(task), std::forward<Args>(args)...);
{
std::lock_guard<std::mutex> lg(_mtx);
_taskQueue.push(std::forward<decltype(func)>(func)); //传入我们包装好的函数对象
}
_isEmpty.notify_one(); //条件变量的通知并不会因为多线程而影响结果(因此可以不加锁)
}

• 优化2 - 测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
std::mutex gb_mtx;
void Add(int x, int y)
{
std::lock_guard<std::mutex> lg(gb_mtx); //为了保证输出结果不会打印错乱
std::cout << x << " + " << y << " = " << x + y << std::endl;
}

int main()
{
std::shared_ptr<ThreadPool> tp(new ThreadPool());
for (int i = 0; i < 10; ++i)
{
//auto f = bind(Add, i, i + 1);
//tp->PushTask(f);
tp->PushTask(Add, i, i + 1); //我们不需要手动绑定了, 直接传参即可
}
return 0;
}

&emsp;输出结果与测试1的相同。

&emsp;

&emsp;

优化3 - 通过future获取任务函数的返回值

&emsp;在优化2的基础上,为了能够接收任务函数的返回值,并且还不能让线程阻塞。这里需要使用线程异步。我们在PushTask()中需要返回一个future类型的对象。下面是对PushTask()方法的修改。*(除了PushTask, 其它地方没有任何修改)*

注意: 记得包含一下<future>头文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
template<typename F, typename... Args>
auto PushTask(F&& task, Args&&... args) -> std::future<decltype(task(args...))>
{
//将可变参数包装起来
auto func = std::bind(std::forward<F>(task), std::forward<Args>(args)...);
auto task_ptr = std::make_shared<std::packaged_task<decltype(task(args...))()>>(func); //(1)
std::function<void()> wrapper_func = [task_ptr] {
(*task_ptr)();
};//(2)

{
std::lock_guard<std::mutex> lg(_mtx);
_taskQueue.push(wrapper_func);
}

_isEmpty.notify_one(); //条件变量的通知并不会因为多线程而影响结果(因此可以不加锁)
return task_ptr->get_future();//(3)
}

&emsp;在优化2的基础上,我们从(1)开始看。我们使用make_shared构造了一个share_ptr对象, 方便我们管理。这个智能指针的类型是package_task。详细介绍看这里: [[C++/Modern C++/Grammar/C++11 多线程操作#6.4 package_task]]。这个package_task中包装的函数对象的类型为decltype(task(args...))()。这个类型的含义为: 该函数对象的返回值的类型为task(args...)函数参数为空

&emsp;(2)是对智能指针再次进行了一层封装,目的为了能够将其作为参数传给任务队列。实际上这里可以省略不写的,然后更改_taskQueue.push(wrapper_func);。下面是省略的写法:

1
2
3
_taskQueue.push([task_ptr]{
(*task_ptr)();
});

&emsp;(3)返回智能指针管理的package_task的future对象。

&emsp;

• 优化3 - 测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
int Add(int x, int y)
{
return x + y;
}

int main()
{
std::shared_ptr<ThreadPool> tp(new ThreadPool());
std::vector<std::future<int>> res; //future保存的就是函数的返回值
for (int i = 0; i < 10; ++i)
{
res.push_back(tp->PushTask(Add, i, i + 1));
}

for (auto& e : res)
{
std::cout << e.get() << std::endl;
}
return 0;
}

输出结果:

1
3
5
7
9
11
13
15
17
19

&emsp;

&emsp;

总结

&emsp;我们实现的pthread的线程池是基于模板的,因此当有不同类型的任务想要添加到线程池处理时,我们就必须额外创建一个对应类型的线程池对象,这个过程往往不是我们想看到的。为了解决这种问题,我们给出了三种递进式优化方案,每种都是基于前面一种的基础上作的进一步优化。

&emsp;优化1: 支持任意类型的任务函数。它的本质是让任务队列能够接收任意类型的任务函数,我们这里使用了function<void()>作为任务队列的类型,也就是说我们在PushTask()时需要事前包装好任务函数,保证其类型为void()。*(返回值为void, 参数为空)*

&emsp;优化2: 支持可变参数。这是通过可变参数模板+bind实现的。我们允许使用者直接传任务函数以及它的任意个参数。我们需要在push任务前包装好任务函数,保证任务函数为void()类型。而如何包装? 这里就需要使用bind进行包装了。
auto func = std::bind(std::forward<F>(task), std::forward<Args>(args)...);

&emsp;优化3: 通过future获取任务函数的返回值。这是通过线程异步实现的。获取子线程执行完毕的函数的返回值的方法就是使用异步。因此我们将func任务函数 包装到了package_task<decltype(task(args...))()>任务包当中,package_task中保存了future对象。这里我们使用了智能指针去管理package_task。由于我们最终的目的是需要将一个类型为void()的任务添加到任务队列。因此这里我们使用了lambda表达式对智能指针再次进行了一层封装,里面包装了智能指针去调用任务函数的过程。lambada表达式的类型就是void(),因此我们可以将其添加到任务队列当中。后续取出任务后,直接像调用函数一样执行任务即可。*(因为任务队列当中存的都是一个个的function<void()>函数对象, 执行函数对象的方法就是使用operator())*

杂谈:

&emsp;个人觉得能够掌握前两种优化就够了。第三种相对比较难懂一些,并且没有太大的必要。我们之所以会实现第三种是因为想要获取任务函数的返回值! 因为我们在优化2的状态下也能够获取到任务的返回值。我们可以传入一个输出型参数即可。示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
void Add(int x, int y, int& res)
{
res = x + y;
}

int main()
{
std::shared_ptr<ThreadPool<Task>> tp(new ThreadPool<Task>());
int res;
tp->PushTask(Add, 1, 2, std::ref(res)); //注意, 这里必须要使用ref()引用传参!!!
std::cout << res << std::endl;
return 0;
}

输出结果: 3

&emsp;

: 还有很多人会将taskQueue任务队列封装成一个类(SafeQueue),然后封装任务队列的各种接口,将各种加锁、解锁操作都封装到了该类里面。外界在使用该类时,不需要再额外考虑锁的问题了。

&emsp;

参考:

Github: mtrebi/thread-pool: Thread pool implementation using c++11 threads

异步实现: https://zhuanlan.zhihu.com/p/367309864