Linux线程池
- 一、线程池的概念
- 二、线程池的优点
- 三、线程池的应用场景
- 四、线程池的代码实现
一、线程池的概念
线程池是一种线程使用模式。
线程过多会带来调度开销,进而影响缓存局部和整体性能,而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。
二、线程池的优点
- 线程池避免了在处理短时间任务时创建与销毁线程的代价。
- 线程池不仅能够保证内核充分利用,还能防止过分调度。
注意: 线程池中可用线程的数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
三、线程池的应用场景
线程池常见的应用场景如下:
- 需要大量的线程来完成任务,且完成任务的时间比较短。
- 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
- 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。
相关解释:
- 像Web服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。
- 对于长时间的任务,比如Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
- 突发性大量客户请求,在没有线程池的情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,但短时间内产生大量线程可能使内存到达极限,出现错误。
四、线程池的代码实现
下面我们实现一个简单的线程池,线程池中提供了一个任务队列,以及若干个线程(多线程)。
- 线程池中的多个线程负责从任务队列当中拿任务,并将拿到的任务进行处理
- 线程池对外提供一个Push接口,用于让外部线程能够将任务Push到任务队列当中。
线程池的代码如下:
ThreadPool.hpp
#pragma once
#include <iostream>
#include "Thread.hpp"
#include <string>
#include <vector>
#include <queue>
#include <unistd.h>
#include "Task.hpp"
#include <functional>using namespace ThreadMoudle;
using namespace std;static const int gdefault = 5;void test() // 这里正好 跟回调函数一样:返回值和参数均为空
{while (true){sleep(1);cout << " hello ThreadPool " << endl;}
}template <typename T>
class Threadpool
{private:void LockQueue(){pthread_mutex_lock(&_mutex);}void UnlockQueue(){pthread_mutex_unlock(&_mutex);}void Wakeup(){pthread_cond_signal(&_cond);}void WakeupAll(){pthread_cond_broadcast(&_cond);}void Sleep(){pthread_cond_wait(&_cond, &_mutex);}bool IsEmpty(){return _task_queue.empty();}void HandlerTask(string &name) // 这个函数是 每个线程都要执行的任务!!!{while (true){LockQueue();while (IsEmpty() && _isrunning){_sleep_thread_num++;Sleep();_sleep_thread_num--;}// 只需要判定一种情况了:只要这两个不一起满足,线程池就必须一直运行!if (IsEmpty() && _isrunning){cout << name << " is quit" << endl;UnlockQueue();break;}// 做到此处 就是不为空:有任务T t = _task_queue.front(); // ????????????这里的T的含义????????????_task_queue.pop();UnlockQueue();t(); // 处理任务 一定不能在 临界区进行!!!cout << name << ":" << t.result() << endl;}}public:Threadpool(int thread_num = gdefault): _thread_num(thread_num), _isrunning(false), _sleep_thread_num(0){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_cond, nullptr);}void Init(){func_t func = std::bind(&Threadpool::HandlerTask, this, std::placeholders::_1);for (int i = 0; i < _thread_num; i++){string threadname = "thread- " + to_string(i + 1);_threads.emplace_back(threadname, func);}}void Start(){_isrunning = true;for (auto &thread : _threads){thread.Start();}}void Stop(){LockQueue();_isrunning = false;WakeupAll();UnlockQueue();}void Equeue(const T &in){LockQueue();if (_isrunning){_task_queue.push(in);if (_sleep_thread_num > 0){Wakeup();}}UnlockQueue();}~Threadpool(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_cond);}private:int _thread_num;vector<Thread> _threads;queue<T> _task_queue; // 这里的T的含义:是不是 队列里面每个元素的数据类型?????????bool _isrunning;int _sleep_thread_num;pthread_mutex_t _mutex;pthread_cond_t _cond;
};
为什么线程池中需要有互斥锁和条件变量?
- 加互斥锁的原因:线程池中的任务队列是会被多个执行流同时访问的临界资源,因此我们需要引入互斥锁对任务队列进行保护。
- 加条件变量的原因:线程池当中的线程要从任务队列里拿任务,前提条件是任务队列中必须要有任务,因此线程池当中的线程在拿任务之前,需要先判断任务队列当中是否有任务,若此时任务队列为空,那么该线程应该进行等待,直到任务队列中有任务时再将其唤醒,因此我们需要引入条件变量。
- 当外部线程向任务队列中Push一个任务后,此时可能有线程正处于等待状态,因此在新增任务后需要唤醒在条件变量下等待的线程。
注意:
- 当某线程被唤醒时,其可能是被异常或是伪唤醒,或者是一些广播类的唤醒线程操作而导致所有线程被唤醒,使得在被唤醒的若干线程中,只有个别线程能拿到任务。此时应该让被唤醒的线程再次判断是否满足被唤醒条件,所以在判断任务队列是否为空时,应该使用while进行判断,而不是if。
- pthread_cond_broadcast函数的作用是唤醒条件变量下的所有线程,而外部可能只Push了一个任务,我们却把全部在等待的线程都唤醒了,此时这些线程就都会去任务队列获取任务,但最终只有一个线程能得到任务。一瞬间唤醒大量的线程可能会导致系统震荡,这叫做惊群效应。因此在唤醒线程时最好使用pthread_cond_signal函数唤醒正在等待的一个线程即可。
- 当线程从任务队列中拿到任务后,该任务就已经属于当前线程了,与其他线程已经没有关系了,因此应该在解锁之后再进行处理任务,而不是在解锁之前进行。因为处理任务的过程可能会耗费一定的时间,所以我们不要将其放到临界区当中。
- 如果将处理任务的过程放到临界区当中,那么当某一线程从任务队列拿到任务后,其他线程还需要等待该线程将任务处理完后,才有机会进入临界区。此时虽然是线程池,但最终我们可能并没有让多线程并行的执行起来。
任务类型的设计
我们将线程池进行了模板化,因此线程池当中存储的任务类型可以是任意的,但无论该任务是什么类型的,在该任务类当中都必须包含一个Run方法,当我们处理该类型的任务时只需调用该Run方法即可。
例如,下面我们实现一个计算任务类:
#pragma once#include<iostream>
#include<pthread.h>using namespace std;class Task
{public:Task(){}Task(int x,int y):_x(x),_y(y){}void Execute(){_result= _x+_y;}void operator()(){Execute();// 直接用仿函数() 调佣 Execute();}// 添加一个debug函数为了方便打印!string debug(){string msg= to_string(_x) + " + " + to_string(_y) + " = ? ";return msg;}string result(){string msg=to_string(_x) + " + " + to_string(_y) + " = " + to_string(_result);return msg;}~Task(){}private:int _x;int _y;int _result;
};
此时线程池内的线程不断从任务队列拿出任务进行处理,而它们并不需要关心这些任务是哪来的,它们只需要拿到任务后执行对应的Run方法即可。
主函数:main.cc
主线程就负责不断向任务队列当中Push任务就行了,此后线程池当中的线程会从任务队列当中获取到这些任务并进行处理。
#include"Threadpool.hpp"
#include"Task.hpp"
int main()
{Threadpool<Task>* tp=new Threadpool<Task>(5);tp->Init();tp->Start();int cnt=10;while(cnt){// 向线程池不断地推送任务!sleep(1);Task t(1,1);tp->Equeue(t);sleep(1);cout<<"cnt: "<<cnt--<<endl;}tp->Stop();cout<<" thread pool is stop"<<endl;sleep(5);return 0;
}
这里我们主线程控制的是让他执行10次1+1的打印,最后自动退出
注意: 此后我们如果想让线程池处理其他不同的任务请求时,我们只需要提供一个任务类,在该任务类当中提供对应的任务处理方法就行了。