pthread线程池

 线程池是一种线程的使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时 创建与销毁线程的代价。线程池不仅能保证内核的充分利用,还能防止过度调度。*//可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。*

• 线程池的优点

  1. 当有任务时,能够立刻让进程去处理,省掉了创建进程所需要的时间
  2. 能够有效防止server中线程过多而导致的系统过载的问题

• 线程池的应用场景

  1. 场景需要大量的进程来完成任务,且完成任务的时间相对较短。(这样的话,使用线程池后,就可以大幅度的提高由于创建线程而消耗的时间)
  2. 对性能的要求苛刻的应用。如:要求服务器迅速响应客户的请求
  3. 需要接受突发性的大量请求,而不至于使服务器因此产生大量线程的场景。

 

线程池的模拟实现

 我们使用线程池的目的是想要提前创建一批线程,让这批线程帮助我们执行任务。具体谈一谈就是:生产者线程从网络中获取任务,然后将任务放到任务队列当中,随后通知消费者线程从任务队列当中取任务,消费者线程在取完任务后处理任务。

ThreadPool类的成员变量

  1. 我们需要一个任务队列用于存放任务,这里直接使用阻塞队列taskQueue作为任务队列了*(实际上还可以使用环形队列等)*。

  2. 任务队列是公共资源,那么就要进行保护。因此我们需要一个互斥锁mutex。

  3. 线程池的运作流程是从 生产者线程获取任务 -> Push到任务队列中 -> 消费者线程Get任务 -> 消费者线程处理任务。在存取任务时,可能会出现任务队列为空的情况,因此我们需要一个条件变量cond以让消费者线程在该条件下等待。

注意:任务队列不可能满!任务队列一旦满了就意味着你的生产者线程无法再获取外界的任务了。一个无法获取任务的服务器就没什么用了。所以我们不需要第二个条件变量让生产者线程等待。

 

• ThreadPool类成员

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#define NUM 5	//NUM是创建的线程的数量
template <class T>
class ThreadPool
{
public:
bool IsEmpty() { return taskQueue.empty(); };
void LockQueue() { pthread_mutex_lock(&mtx); };
void UnlockQueue() {pthread_mutex_unlock(&mtx); };
void ThreadWait() { pthread_cond_wait(&cond, &mtx); };
public:
ThreadPool(int num = NUM)
:threadNum(Num)
{
pthread_mutex_init(&mtx, nullptr);
pthread_cond_init(&cond, nullptr);
}

void PoolInit(); //线程池的初始化
static void* Routine(void* arg); //线程的启动函数

void Get(T& task); //获取任务
void Push(T& task); //放任务

~ThreadPool()
{
pthread_mutex_destroy(&mtx);
pthread_cond_destroy(&cond);
}
private:
std::queue<T*> taskQueue; //任务队列
int threadNum; //创建的线程的数量
pthread_mutex_t mtx; //互斥锁
pthread_cond_t cond; //只有消费者线程需要等待(在任务队列为空的时候等待),而生产者线程不需要等待,因为任务队列一般满不了
};

&emsp;

• 线程池的初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
void PoolInit()
{
pthread_t t;
for(int i = 0; i < threadNum; ++i)
{
int ret = pthread_create(&t, nullptr, Routine, this);//传this的原因下面解释
if(ret < 0)
{
std::cout << "thread create failed!" << std::endl;
exit(1);
}
}
}

&emsp;

• 消费者线程的启动函数 - 处理任务

&emsp;定义为static的原因在于pthread_create函数要求线程的启动函数参数必须是void*类型的,而类的成员函数的第一个参数都是隐含的this指针,this指针的类型不满足要求,因此我们必须要舍弃this指针,而定义成静态的成员函数就没有this指针了。如果我们想要在静态成员函数中访问非静态的成员的话,只能通过传参的时候传入一个this指针,然后通过this指针去访问。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
static void* Routine(void* arg)
{
pthread_detach(pthread_self()); //线程分离
ThreadPool<T>* pthis = (ThreadPool<T>*)arg;
while(true)
{
pthis->LockQueue();
//任务队列空的话就让线程等待
while(pthis->IsEmpty())
{
usleep(5000);
pthis->ThreadWait();
}

//任务队列不为空,处理任务
T data;
pthis->Get(data);
pthis->UnlockQueue();
data.Run(); //每个任务的data都是私有的,因此不需要在临界区处理任务
}
}

&emsp;

• Push 与 Get 任务

&emsp;Get中不需要在进行加锁了,因为消费者线程在调用Get函数时已经处于加锁状态了。有一些线程池的实现中,也会直接省略掉Get()方法,直接在线程工作方法中获取任务了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
void Get(T& task)
{
T* tmp = taskQueue.front();
task = *tmp;
taskQueue.pop();
}

void Push(T& task) //这里不能加const,const变量无法取地址
{
LockQueue();
taskQueue.push(&task);
UnlockQueue();
pthread_cond_signal(&cond); //push一个任务后,要通知一个线程去消化
}

&emsp;

线程池测试

• Task任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
class Task
{
public:
Task(int base = 10)
:_base(base)
{}
void Run()
{
std::cout << "ThreadID:["<< pthread_self() << "]";
std::cout << " Run Task --- Base: " << _base * _base << std::endl;
}
~Task(){}
private:
int _base;
};

• 测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
/******************* main.cpp *************************/
#include "ThreadPool.h"
int main()
{
ThreadPool<Task>* tp = new ThreadPool<Task>();
tp->PoolInit();

while(true)
{
int base = rand() % 10 + 1;
Task t(base);
tp->Push(t);
sleep(1);
}
return 0;
}

&emsp;

&emsp;

pthread线程池整体代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
#ifndef __THREAD_POOL_H_
#define __THREAD_POOL_H_
#include <iostream>
#include <unistd.h>
#include <pthread.h>
#include <queue>
#define NUM 5

class Task
{
public:
Task(int base = NUM)
:_base(base)
{}

void Run()
{
std::cout << "ThreadID:["<< pthread_self() << "]";
std::cout << " Run Task --- Base: " << _base * _base << std::endl;
}
~Task(){}
private:
int _base;
};

template <typename T>
class ThreadPool
{
private:
bool isEmpty()
{
return q.empty();
}

//定义static原因是在传参时并不打算传什么有效的参数
//但是由于它是类的成员函数,所以会有隐含的this指针
//传nullptr会报错,所以这里只能定义为static,然后传this指针了
static void* Routine(void* arg)
{
pthread_detach(pthread_self());
while(true)
{
ThreadPool* this_p = (ThreadPool*)arg;
pthread_mutex_lock(&this_p->lock);
while(!this_p->quit && this_p->isEmpty()) //quit
{
pthread_cond_wait(&this_p->cond, &this_p->lock);
}

//线程退出时,有可能队列为空,所以要判断一下,为空就不能取数据了
if(!this_p->isEmpty())
{
T data;
this_p->Get(data);
pthread_mutex_unlock(&this_p->lock);
data.Run(); //Run任务的之前,最好先解锁!因为线程运行到这里时,
} //并没有访问什么临界资源,这里的data是线程私有的部分,所以可以先解锁
else
{
pthread_mutex_unlock(&this_p->lock);
}
}
}

public:
//在测试线程池前,自己调用初始化函数 (不在构造函数中搞这个,因为有风险)
//万一在构造函数中创建失败了,那已经创建的对象怎么办? 空间浪费了
bool PoolInit()
{
pthread_t tid;
for(int i = 0; i < max_cap; ++i)
{
int ret = pthread_create(&tid, nullptr, Routine, this);
if(ret != 0)
{
std::cout << "thread create error!" << '\n';
return false;
}
}
return true;
}
public:
ThreadPool(size_t _cap = NUM)
:max_cap(_cap),
quit(false)
{
pthread_mutex_init(&lock, nullptr);
pthread_cond_init(&cond, nullptr);
}

void Get(T& data)
{
//Get这里并不需要加锁,因为消费者线程在调用Get之前,
//已经是加锁的状态了
T* tmp = q.front();
data = *tmp;
q.pop();
}

void Put(T& data)
{
pthread_mutex_lock(&lock);
q.push(&data);
pthread_mutex_unlock(&lock);
pthread_cond_signal(&cond);//放入一个任务后,叫醒一个消费者
//不然消费者会一直处于挂起
}

void ThreadQuit()
{
quit = true; //退出状态设为true
while(!isEmpty())
pthread_cond_broadcast(&cond);//持续唤醒所有线程,直到全部退出
}

~ThreadPool()
{
pthread_mutex_destroy(&lock);
pthread_cond_destroy(&cond);
}
private:
std::queue<T*> q; //生产者线程和消费者线程之间进行交互的队列
size_t max_cap;
pthread_mutex_t lock; //访问临街资源时要加的锁
pthread_cond_t cond; //让消费者线程等待的条件变量
bool quit; //退出状态
};

#endif