在看这篇文章之前,请先确保大致了解过线程池的基本实现及其原理。如果不太了解,可以先去阅读一下这篇文章: (pthread线程池 )
C++11 简单线程池  下面的线程池是基于C++11的线程池。编写的思路跟前文的pthread实现的线程池基本一致。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 #include <thread> #include <vector> #include <queue> #include <mutex> #include <condition_variable> #include <functional> #include <atomic> template <class T >class ThreadPool { public : ThreadPool (size_t thread_nums = std::thread::hardware_concurrency ()) :_thread_nums(thread_nums) , _stop(false ) { for (size_t i = 0 ; i < _thread_nums; ++i) { _vt.emplace_back (std::thread (&ThreadPool::LoopWork, this )); } } ThreadPool (const ThreadPool &) = delete ; ThreadPool& operator =(const ThreadPool &) = delete ; private : void LoopWork () { std::unique_lock<std::mutex> ul (_mtx) ; for (;;) { while (_taskQueue.empty () && !_stop) { _isEmpty.wait (ul); } if (_taskQueue.empty ()) { ul.unlock (); break ; } else { T* task = std::move (_taskQueue.front ()); _taskQueue.pop (); ul.unlock (); (*task)(); ul.lock (); } } } public : void PushTask (T& task) { { std::lock_guard<std::mutex> lg (_mtx) ; _taskQueue.push (&task); } _isEmpty.notify_one (); } ~ThreadPool () { _stop = true ; _isEmpty.notify_all (); for (size_t i = 0 ; i < _thread_nums; ++i) { if (_vt[i].joinable ()) { _vt[i].join (); } } } private : size_t _thread_nums; std::vector<std::thread> _vt; std::queue<T*> _taskQueue; std::mutex _mtx; std::condition_variable _isEmpty; std::atomic<bool > _stop; }; struct Task { Task (int x, int y) :_x(x), _y(y) {} void operator () () { std::cout << _x << " + " << _y << " = " << _x + _y << std::endl; } int _x; int _y; }; int main () { std::shared_ptr<ThreadPool<Task>> tp (new ThreadPool <Task>()); Task t (1 , 2 ) ; tp->PushTask (t); }
输出结果 :
1 + 2 = 3
 
 
优化1 - 支持任意类型任务  在前文的pthread线程池以及根据pthread改写的C++11线程池都是基于模板实现的。因为线程池需要能够接收不同类型的任务。但是将整个ThreadPool
类设为模板其实不是最优解,因为当ThreadPool需要处理其它类型的任务时,还需要再实例化出一个新的ThreadPool类。它并没有实现一个线程池实例接收多种类型任务 的功能。它实现的是多个线程池实例接收多种类型任务,每个线程池本质是只处理一种任务。
 为了解决上述问题,我们不应该将整个ThreadPool都设为模板类,我们的思路是让任务队列能够存放不同类型的任务 。这里使用到了C++11中的function
包装器,让任务队列里存放function<void()>
类型的任务。而在PushTask()
方法里,我们需要让该函数能够接收任意类型的任务,因此需要单独将PushTask()
方法设为模板函数。
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 #include <thread> #include <vector> #include <queue> #include <mutex> #include <condition_variable> #include <functional> #include <atomic> class ThreadPool { public : ThreadPool (size_t thread_nums = std::thread::hardware_concurrency ()) :_thread_nums(thread_nums) , _stop(false ) { for (size_t i = 0 ; i < _thread_nums; ++i) { _vt.emplace_back (std::thread (&ThreadPool::LoopWork, this )); } } ThreadPool (const ThreadPool &) = delete ; ThreadPool& operator =(const ThreadPool &) = delete ; private : void LoopWork () { std::unique_lock<std::mutex> ul (_mtx) ; for (;;) { while (_taskQueue.empty () && !_stop) { _isEmpty.wait (ul); } if (_taskQueue.empty ()) { ul.unlock (); break ; } else { auto task = std::move (_taskQueue.front ()); _taskQueue.pop (); ul.unlock (); task (); ul.lock (); } } } public : template <typename F> void PushTask (F&& task) { { std::lock_guard<std::mutex> lg (_mtx) ; _taskQueue.push (std::forward<F>(task)); } _isEmpty.notify_one (); } ~ThreadPool () { _stop = true ; _isEmpty.notify_all (); for (size_t i = 0 ; i < _thread_nums; ++i) { if (_vt[i].joinable ()) { _vt[i].join (); } } } private : size_t _thread_nums; std::vector<std::thread> _vt; std::queue<std::function<void ()>> _taskQueue; std::mutex _mtx; std::condition_variable _isEmpty; std::atomic<bool > _stop; };
 
• 优化1 - 测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 std::mutex gb_mtx; void Add (int x, int y) { std::lock_guard<std::mutex> lg (gb_mtx) ; std::cout << x << " + " << y << " = " << x + y << std::endl; } int main () { std::shared_ptr<ThreadPool> tp (new ThreadPool()) ; for (int i = 0 ; i < 10 ; ++i) { auto f = bind (Add, i, i + 1 ); tp->PushTask (f); } return 0 ; }
输出结果 :
0 + 1 = 1 1 + 2 = 3 2 + 3 = 5 3 + 4 = 7 4 + 5 = 9 5 + 6 = 11 6 + 7 = 13 7 + 8 = 15 8 + 9 = 17 9 + 10 = 19
 
 
优化2 - 支持可变参数  前一个版本的线程池是支持任意类型的任务的,但是我们在使用线程池的时候必须要手动bind一个函数对象,然后再传过去(这是因为任务队列要求任务必须是void返回值且无参数 )。为了优化这个问题,我们可以使用可变参数模板+bind解决 。下面给出PushTask()
方法的优化: (除了PushTask, 其它地方没有任何修改)
1 2 3 4 5 6 7 8 9 10 11 template <typename F, typename ... Args>void PushTask (F&& task, Args&&... args) { auto func = std::bind (std::forward<F>(task), std::forward<Args>(args)...); { std::lock_guard<std::mutex> lg (_mtx) ; _taskQueue.push (std::forward<decltype (func)>(func)); } _isEmpty.notify_one (); }
• 优化2 - 测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 std::mutex gb_mtx; void Add (int x, int y) { std::lock_guard<std::mutex> lg (gb_mtx) ; std::cout << x << " + " << y << " = " << x + y << std::endl; } int main () { std::shared_ptr<ThreadPool> tp (new ThreadPool()) ; for (int i = 0 ; i < 10 ; ++i) { tp->PushTask (Add, i, i + 1 ); } return 0 ; }
 输出结果与测试1的相同。
 
 
优化3 - 通过future获取任务函数的返回值  在优化2的基础上,为了能够接收任务函数的返回值,并且还不能让线程阻塞。这里需要使用线程异步。我们在PushTask()
中需要返回一个future
类型的对象。下面是对PushTask()
方法的修改。*(除了PushTask, 其它地方没有任何修改)*
注意: 记得包含一下<future>
头文件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 template <typename F, typename ... Args>auto PushTask (F&& task, Args&&... args) -> std::future<decltype (task(args...)) > { auto func = std::bind (std::forward<F>(task), std::forward<Args>(args)...); auto task_ptr = std::make_shared<std::packaged_task<decltype (task (args...))()>>(func); std::function<void ()> wrapper_func = [task_ptr] { (*task_ptr)(); }; { std::lock_guard<std::mutex> lg (_mtx) ; _taskQueue.push (wrapper_func); } _isEmpty.notify_one (); return task_ptr->get_future (); }
 在优化2的基础上,我们从(1)开始看。我们使用make_shared
构造了一个share_ptr对象, 方便我们管理。这个智能指针的类型是package_task
。详细介绍看这里: [[C++/Modern C++/Grammar/C++11 多线程操作#6.4 package_task]]。这个package_task
中包装的函数对象的类型为decltype(task(args...))()
。这个类型的含义为: 该函数对象的返回值的类型为task(args...)
, 函数参数为空 。
 (2)是对智能指针再次进行了一层封装,目的为了能够将其作为参数传给任务队列。实际上这里可以省略不写的,然后更改_taskQueue.push(wrapper_func);
。下面是省略的写法:
1 2 3 _taskQueue.push ([task_ptr]{ (*task_ptr)(); });
 (3)返回智能指针管理的package_task的future对象。
 
• 优化3 - 测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 int Add (int x, int y) { return x + y; } int main () { std::shared_ptr<ThreadPool> tp (new ThreadPool()) ; std::vector<std::future<int >> res; for (int i = 0 ; i < 10 ; ++i) { res.push_back (tp->PushTask (Add, i, i + 1 )); } for (auto & e : res) { std::cout << e.get () << std::endl; } return 0 ; }
输出结果 :
1 3 5 7 9 11 13 15 17 19
 
 
总结  我们实现的pthread的线程池是基于模板的,因此当有不同类型 的任务想要添加到线程池处理时,我们就必须额外创建一个对应类型的线程池对象,这个过程往往不是我们想看到的。为了解决这种问题,我们给出了三种递进式优化方案,每种都是基于前面一种的基础上作的进一步优化。
 优化1: 支持任意类型的任务函数。它的本质是让任务队列能够接收任意类型的任务函数,我们这里使用了function<void()>
作为任务队列的类型,也就是说我们在PushTask()
时需要事前包装好任务函数,保证其类型为void()
。*(返回值为void, 参数为空)*
 优化2: 支持可变参数。这是通过可变参数模板+bind实现的。我们允许使用者直接传任务函数以及它的任意个参数。我们需要在push任务前包装好任务函数,保证任务函数为void()
类型。而如何包装? 这里就需要使用bind进行包装了。auto func = std::bind(std::forward<F>(task), std::forward<Args>(args)...);
 优化3: 通过future获取任务函数的返回值。这是通过线程异步实现的。获取子线程执行完毕的函数的返回值的方法就是使用异步。因此我们将func
任务函数 包装到了package_task<decltype(task(args...))()>
任务包当中,package_task
中保存了future对象。这里我们使用了智能指针去管理package_task
。由于我们最终的目的是需要将一个类型为void()
的任务添加到任务队列。因此这里我们使用了lambda表达式对智能指针再次进行了一层封装,里面包装了智能指针去调用任务函数的过程。lambada表达式的类型就是void()
,因此我们可以将其添加到任务队列当中。后续取出任务后,直接像调用函数一样执行任务即可。*(因为任务队列当中存的都是一个个的function<void()>函数对象, 执行函数对象的方法就是使用operator())*
杂谈 :
 个人觉得能够掌握前两种优化就够了。第三种相对比较难懂一些,并且没有太大的必要。我们之所以会实现第三种是因为想要获取任务函数的返回值! 因为我们在优化2
的状态下也能够获取到任务的返回值。我们可以传入一个输出型参数即可。示例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 void Add (int x, int y, int & res) { res = x + y; } int main () { std::shared_ptr<ThreadPool<Task>> tp (new ThreadPool <Task>()); int res; tp->PushTask (Add, 1 , 2 , std::ref (res)); std::cout << res << std::endl; return 0 ; }
输出结果 : 3
 
注 : 还有很多人会将taskQueue
任务队列封装成一个类(SafeQueue
),然后封装任务队列的各种接口,将各种加锁、解锁操作都封装到了该类里面。外界在使用该类时,不需要再额外考虑锁的问题了。
 
参考 :
Github: mtrebi/thread-pool: Thread pool implementation using c++11 threads
异步实现: https://zhuanlan.zhihu.com/p/367309864