pthread线程池
 线程池是一种线程的使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时 创建与销毁线程的代价。线程池不仅能保证内核的充分利用,还能防止过度调度。*//可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。*
• 线程池的优点
- 当有任务时,能够立刻让进程去处理,省掉了创建进程所需要的时间
- 能够有效防止server中线程过多而导致的系统过载的问题
• 线程池的应用场景
- 场景需要大量的进程来完成任务,且完成任务的时间相对较短。(这样的话,使用线程池后,就可以大幅度的提高由于创建线程而消耗的时间)
- 对性能的要求苛刻的应用。如:要求服务器迅速响应客户的请求
- 需要接受突发性的大量请求,而不至于使服务器因此产生大量线程的场景。
 
线程池的模拟实现
 我们使用线程池的目的是想要提前创建一批线程,让这批线程帮助我们执行任务。具体谈一谈就是:生产者线程从网络中获取任务,然后将任务放到任务队列当中,随后通知消费者线程从任务队列当中取任务,消费者线程在取完任务后处理任务。
ThreadPool类的成员变量:
我们需要一个任务队列用于存放任务,这里直接使用阻塞队列taskQueue
作为任务队列了*(实际上还可以使用环形队列等)*。
任务队列是公共资源,那么就要进行保护。因此我们需要一个互斥锁mutex。
线程池的运作流程是从 生产者线程获取任务 -> Push到任务队列中 -> 消费者线程Get任务 -> 消费者线程处理任务。在存取任务时,可能会出现任务队列为空的情况,因此我们需要一个条件变量cond以让消费者线程在该条件下等待。
注意:任务队列不可能满!任务队列一旦满了就意味着你的生产者线程无法再获取外界的任务了。一个无法获取任务的服务器就没什么用了。所以我们不需要第二个条件变量让生产者线程等待。
 
• ThreadPool类成员
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| #define NUM 5 template <class T> class ThreadPool { public: bool IsEmpty() { return taskQueue.empty(); }; void LockQueue() { pthread_mutex_lock(&mtx); }; void UnlockQueue() {pthread_mutex_unlock(&mtx); }; void ThreadWait() { pthread_cond_wait(&cond, &mtx); }; public: ThreadPool(int num = NUM) :threadNum(Num) { pthread_mutex_init(&mtx, nullptr); pthread_cond_init(&cond, nullptr); } void PoolInit(); static void* Routine(void* arg);
void Get(T& task); void Push(T& task);
~ThreadPool() { pthread_mutex_destroy(&mtx); pthread_cond_destroy(&cond); } private: std::queue<T*> taskQueue; int threadNum; pthread_mutex_t mtx; pthread_cond_t cond; };
|
 
• 线程池的初始化
1 2 3 4 5 6 7 8 9 10 11 12 13
| void PoolInit() { pthread_t t; for(int i = 0; i < threadNum; ++i) { int ret = pthread_create(&t, nullptr, Routine, this); if(ret < 0) { std::cout << "thread create failed!" << std::endl; exit(1); } } }
|
 
• 消费者线程的启动函数 - 处理任务
 定义为static的原因在于pthread_create
函数要求线程的启动函数参数必须是void*类型的,而类的成员函数的第一个参数都是隐含的this指针,this指针的类型不满足要求,因此我们必须要舍弃this指针,而定义成静态的成员函数就没有this指针了。如果我们想要在静态成员函数中访问非静态的成员的话,只能通过传参的时候传入一个this指针,然后通过this指针去访问。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| static void* Routine(void* arg) { pthread_detach(pthread_self()); ThreadPool<T>* pthis = (ThreadPool<T>*)arg; while(true) { pthis->LockQueue(); while(pthis->IsEmpty()) { usleep(5000); pthis->ThreadWait(); }
T data; pthis->Get(data); pthis->UnlockQueue(); data.Run(); } }
|
 
• Push 与 Get 任务
 Get中不需要在进行加锁了,因为消费者线程在调用Get函数时已经处于加锁状态了。有一些线程池的实现中,也会直接省略掉Get()
方法,直接在线程工作方法中获取任务了。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| void Get(T& task) { T* tmp = taskQueue.front(); task = *tmp; taskQueue.pop(); }
void Push(T& task) { LockQueue(); taskQueue.push(&task); UnlockQueue(); pthread_cond_signal(&cond); }
|
 
线程池测试
• Task任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| class Task { public: Task(int base = 10) :_base(base) {} void Run() { std::cout << "ThreadID:["<< pthread_self() << "]"; std::cout << " Run Task --- Base: " << _base * _base << std::endl; } ~Task(){} private: int _base; };
|
• 测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| #include "ThreadPool.h" int main() { ThreadPool<Task>* tp = new ThreadPool<Task>(); tp->PoolInit(); while(true) { int base = rand() % 10 + 1; Task t(base); tp->Push(t); sleep(1); } return 0; }
|
 
 
pthread线程池整体代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129
| #ifndef __THREAD_POOL_H_ #define __THREAD_POOL_H_ #include <iostream> #include <unistd.h> #include <pthread.h> #include <queue> #define NUM 5
class Task { public: Task(int base = NUM) :_base(base) {}
void Run() { std::cout << "ThreadID:["<< pthread_self() << "]"; std::cout << " Run Task --- Base: " << _base * _base << std::endl; } ~Task(){} private: int _base; };
template <typename T> class ThreadPool { private: bool isEmpty() { return q.empty(); }
static void* Routine(void* arg) { pthread_detach(pthread_self()); while(true) { ThreadPool* this_p = (ThreadPool*)arg; pthread_mutex_lock(&this_p->lock); while(!this_p->quit && this_p->isEmpty()) { pthread_cond_wait(&this_p->cond, &this_p->lock); }
if(!this_p->isEmpty()) { T data; this_p->Get(data); pthread_mutex_unlock(&this_p->lock); data.Run(); } else { pthread_mutex_unlock(&this_p->lock); } } }
public: bool PoolInit() { pthread_t tid; for(int i = 0; i < max_cap; ++i) { int ret = pthread_create(&tid, nullptr, Routine, this); if(ret != 0) { std::cout << "thread create error!" << '\n'; return false; } } return true; } public: ThreadPool(size_t _cap = NUM) :max_cap(_cap), quit(false) { pthread_mutex_init(&lock, nullptr); pthread_cond_init(&cond, nullptr); }
void Get(T& data) { T* tmp = q.front(); data = *tmp; q.pop(); }
void Put(T& data) { pthread_mutex_lock(&lock); q.push(&data); pthread_mutex_unlock(&lock); pthread_cond_signal(&cond); }
void ThreadQuit() { quit = true; while(!isEmpty()) pthread_cond_broadcast(&cond); }
~ThreadPool() { pthread_mutex_destroy(&lock); pthread_cond_destroy(&cond); } private: std::queue<T*> q; size_t max_cap; pthread_mutex_t lock; pthread_cond_t cond; bool quit; };
#endif
|