1 .线程池概念
- 线程池就是一种多线程处理形式。处理过程中可以将任务添加到队列中。然后创建线程后启动这些任务用创建的线程去执行。
- 线程过多会带来调度开销,进而影响缓存局部性和整体性能。
- 线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。
- 线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。
2 .为什么使用线程池
使用线程池可以根据系统的需求和硬件环境灵活的控制线程的数量,且可以对所有线程进行统一的管理和控制,从而提高系统的运行效率,降低系统运行运行压力;当然了,使用线程池的原因不仅仅只有这些,我们可以从线程池自身的优点上来进一步了解线程池的好处;
- 线程和任务分离,提升线程重用性;
- 控制线程并发数量,降低服务器压力,统一管理所有线程;
- 提升系统响应速度,假如创建线程用的时间为T1,执行任务用的时间为T2,销毁线程用的时间为T3,那么使用线程池就免去了T1和T3的时间;也就是说,减少了线程创建与销毁的次数。
3 . 线程池的应用场景
- 需要大量的线程来完成任务,且完成任务的时间比较短。
- WEB服务器完成网页请求这样的任务,使用线程池技术是非常合适的。因为单个任务小,而任务数量巨大,你可以想象一个热门网站的点击次数。 但对于长时间的任务,比如一个Telnet连接请求,线程池的优点就不明显了。因为Telnet会话时间比线程的创建时间大多了。
- 对性能要求苛刻的应用,比如要求服务器迅速响应客户请求。
- 接受突发性的大量请求,但不至于使服务器因此产生大量线程的应用。突发性大量客户请求,在没有线程池情况下,将产生大量线程,虽然理论上大部分操作系统线程数目最大值不是问题,短时间内产生大量线程可能使内存到达极限,出现错误.
4 . 线程池的实现
-
现在要实现一个主线程不停的往任务队列中放任务,然后让线程池中的线程从任务队列中获取任务,再进行任务处理的一个简易线程池。
-
实现的这个简易的线程池整体分为线程池主体部分代码、任务部分代码、单个部分代码这三部分。
这里线程池的实现用到了单例模式,下面介绍单例模式:
什么是单例模式:
- 单例模式是一种 "经典的, 常用的, 常考的" 设计模式.
单例模式的特点
- 某些类, 只应该具有一个对象(实例), 就称之为单例.
- 例如一个男人只能有一个媳妇.
- 在很多服务器开发场景中, 经常需要让服务器加载很多的数据 (上百G) 到内存中. 此时往往要用一个单例的类来管理这些数据
饿汉实现方式和懒汉实现方式
吃完饭 , 立刻洗碗 , 这种就是饿汉方式 . 因为下一顿吃的时候可以立刻拿着碗就能吃饭 .吃完饭 , 先把碗放下 , 然后下一顿饭用到这个碗了再洗碗 , 就是懒汉方式 .
- 懒汉方式最核心的思想是 "延时加载". 从而能够优化服务器的启动速度.
饿汉方式实现单例模式
template <typename T>
class Singleton {
static T data;
public:
static T* GetInstance() {
return &data;
}
};
- 只要通过 Singleton 这个包装类来使用 T 对象, 则一个进程中只有一个 T 对象的实例
懒汉方式实现单例模式
template <typename T>
class Singleton {
static T* inst;
public:
static T* GetInstance() {
if (inst == NULL) {
inst = new T();
}
return inst;
}
};
- 懒汉方式存在一个严重的问题, 线程不安全.
- 第一次调用 GetInstance 的时候, 如果两个线程同时调用, 可能会创建出两份 T 对象的实例.
- 但是后续再次调用, 就没有问题了
懒汉方式实现单例模式 ( 线程安全版本 )
// 懒汉模式, 线程安全
template <typename T>
class Singleton {
volatile static T* inst; // 需要设置 volatile 关键字, 否则可能被编译器优化.
static std::mutex lock;
public:
static T* GetInstance() {
if (inst == NULL) { // 双重判定空指针, 降低锁冲突的概率, 提高性能.
lock.lock(); // 使用互斥锁, 保证多线程情况下也只调用一次 new.
if (inst == NULL) {
inst = new T();
}
lock.unlock();
}
return inst;
}
};
注意事项 :
- 加锁解锁的位置
- 双重 if 判定, 避免不必要的锁竞争
- volatile关键字防止过度优化
下面我们用懒汉方式造一个简易的线程池:
- ThreadPool.hpp
#pragma once
#include "LockGuard.hpp"
#include "Thread.hpp"
#include "Log.hpp"
#include <vector>
#include <functional>
#include <queue>
#include <unistd.h>using namespace hcc;
using namespace ThreadMoudle;
const int gdefaultnum = 5;
template <class T>
class ThreadPool
{//将加锁和解锁封装为函数void LockQueue(){pthread_mutex_lock(&_mutex);}void UnLockQueue(){pthread_mutex_unlock(&_mutex);}//唤醒所有线程或者单个线程void Wake(){pthread_cond_signal(&_cond);}void WakeAll(){pthread_cond_broadcast(&_cond);}//判断任务列表中还有没有任务bool IsEmpty(){return _task_queue.empty();}//条件变量等待void Sleep(){pthread_cond_wait(&_cond, &_mutex);}//构造线程池,初始化ThreadPool(int thread_num = gdefaultnum) : _thread_num(thread_num), _isrunning(true), _sleep_thread_num(0){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_cond, nullptr);}//处理任务函数void HandlerTask(const std::string &name){while (true){LockQueue();while (IsEmpty() && _isrunning){//没有任务等待任务到来,被唤醒后去执行任务_sleep_thread_num++;Sleep();_sleep_thread_num--;}if (!IsEmpty() && !_isrunning) // 处理有任务,但是没有执行的情况{ UnLockQueue();std::cout << name << " quit" << "\n";break;}// 处理任务// std::cout << name << " 处理任务" << "\n";LOG(INFO,"%s 正在处理任务\n",name.c_str());T task;task = _task_queue.front();_task_queue.pop();task();UnLockQueue();}}//初始化创建 _thread_num 个线程,并用vector管理起来void init(){func_t func = std::bind(&ThreadPool::HandlerTask, this, std::placeholders::_1);for (int i = 0; i < _thread_num; i++){std::string name = "thread - " + std::to_string(i + 1);_threads.emplace_back(name, func);}Start();}//private里面放了构造函数。这里给单例模式提供了条件
public://放任务到任务列表void Equeue(T &in){LockQueue();if (_isrunning){_task_queue.push(in);std::cout << "make a data" << "\n";std::cout << _task_queue.size() << std::endl;if (_sleep_thread_num > 0)Wake();}UnLockQueue();}//将所有线程都启动void Start(){_isrunning = true;for (auto &thread : _threads) // 这里不加引用的话可能会乱码{thread.Start();}}//停止所有线程void Stop(){LockQueue();_isrunning = false;WakeAll();std::cout << "_sleep_thread_num " << _sleep_thread_num << "\n";UnLockQueue();std::cout << "Stop all thread" << std::endl;LOG(INFO,"threadpool heve stop excute\n");}//懒汉方式设计单例模式static ThreadPool<T>*GetInstance(){if(_tp==nullptr){//加一把锁防止同时两个线程都去创建线程池//这里用一个类来封装该锁LockGuard lockguard(&_sig_mutex);if(_tp==nullptr){_tp=new ThreadPool();_tp->init();_tp->Start();LOG(DEBUG,"Create a new threadpool\n");}//已经创建了一个线程了,就返回该线程else {LOG(WARNING,"already have a threadpool\n ");}}return _tp;}~ThreadPool(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_cond);}//单例模式要把构造和赋值构造删除或者私有化ThreadPool(const ThreadPool<T>&)=delete;void operator=(const ThreadPool<T>&)=delete;
private:int _thread_num; // 线程个数std::vector<Thread> _threads; //管理线程的结构std::queue<T> _task_queue; //任务列表 用队列保存bool _isrunning; //判断线程是否工作int _sleep_thread_num; //休眠线程个数pthread_mutex_t _mutex; //互斥锁pthread_cond_t _cond; //条件变量//单例模式static ThreadPool<T> *_tp;static pthread_mutex_t _sig_mutex; //给单例模式设计的锁
};//类的静态成员要在类外初始化——const成员要在初始化列表初始化
template<class T>
ThreadPool<T>*ThreadPool<T>::_tp=nullptr;//单例模式的锁不能在构造函数初始化,只能在外面初始化
template<class T>
pthread_mutex_t ThreadPool<T>::_sig_mutex=PTHREAD_MUTEX_INITIALIZER;
- Thread.hpp
#pragma once
#include <iostream>
#include <string>
#include <pthread.h>
#include <functional>
#include <unistd.h>
namespace ThreadMoudle
{// typedef void (*func_t)(const std::string name);using func_t = std::function<void(const std::string &)>;class Thread{void Excute(){std::cout << _name << " is running " << std::endl;_isrunning = true;_func(_name);_isrunning = false;}//单个线程要执行的方法static void *ThreadRoutine(void *args){Thread *self = static_cast<Thread *>(args);self->Excute();return nullptr;}public:Thread(const std::string name, func_t func): _name(name), _func(func){}bool Start(){int n = ::pthread_create(&_tid, nullptr, ThreadRoutine, this);if (n != 0)return false;return true;}std::string Status(){if (_isrunning)return "running";elsereturn "sleep";}void Join(){::pthread_join(_tid, nullptr);std::cout << _name << " join" << std::endl;}void Stop(){if (_isrunning){::pthread_cancel(_tid);_isrunning = false;std::cout << _name << " Stop" << std::endl;}}~Thread() {}private:std::string _name;pthread_t _tid;bool _isrunning;func_t _func; // 线程要执行的回调函数};
}
- Task.hpp 这里我们创建一个执行加法的任务
#pragma once
#include <iostream>
#include <functional>// typedef std::function<void()> task_t;
using Task_t = std::function<void(const std::string&)>;class Task
{void Excute(){_result = _x + _y;std::cout<<result()<<"\n";}public:Task(){}Task(int x, int y) : _x(x), _y(y) {}void operator()(){Excute();}std::string debug(){std::string msg = std::to_string(_x) + "+" + std::to_string(_y) + " =?";return msg;}std::string result(){std::string msg = std::to_string(_x) + "+" + std::to_string(_y) + " = " + std::to_string(_result);return msg;}private:int _x;int _y;int _result;
};
- LockGuard.hpp RALL编程思想
#include<iostream>
#include<pthread.h>class LockGuard
{public:LockGuard(pthread_mutex_t*mutex):_mutex(mutex){pthread_mutex_lock(_mutex);}~LockGuard(){pthread_mutex_unlock(_mutex);}private:pthread_mutex_t *_mutex;};
这里我们还增加了日志:日志的实现主要就是将当前地点的时间,位置等信息打印出来
- Log.hpp
#include <iostream>
#include <string>
#include <cstring>
#include <cstdarg>
#include <fstream>namespace hcc
{enum Level{DEBUG = 1,INFO,WARNING,ERROR,FATAL};std::string LevelToString(int level){switch (level){case DEBUG:return "DEBUG";case INFO:return "INFO";case WARNING:return "WARNING";case ERROR:return "ERROR";case FATAL:return "FATAL";default:return "UNKNOWN";}}class logmessage{public:std::string _level;pid_t _id;std::string _filename;int _filenumber;std::string _cur_time;std::string _message_info;};std::string GetCurrTime(){time_t now = time(nullptr);struct tm *curr_time = localtime(&now);char buffer[128];snprintf(buffer, sizeof(buffer), "%d-%02d-%02d %02d:%02d:%02d",curr_time->tm_year + 1900,curr_time->tm_mon + 1,curr_time->tm_mday,curr_time->tm_hour,curr_time->tm_min,curr_time->tm_sec);return buffer;}#define SCREEN_TYPE 1
#define FILE_TYPE 2const std::string glogfile = "./log.txt";pthread_mutex_t glock = PTHREAD_MUTEX_INITIALIZER;class Log{public:Log(const std::string &logfile = glogfile) : _logfile(glogfile){}void Enable(int type){_type = type;}void FulushLogToScreen(const logmessage &lg){printf("[%s][%d][%s][%d][%s] %s",lg._level.c_str(),lg._id,lg._filename.c_str(),lg._filenumber,lg._cur_time.c_str(),lg._message_info.c_str());}void FlushLogToFile(const logmessage &lg){std::ofstream out(_logfile, std::ios::app);if (!out.is_open())return;char logtxt[2048];snprintf(logtxt, sizeof(logtxt), "[%s][%d][%s][%d][%s] %s",lg._level.c_str(),lg._id,lg._filename.c_str(),lg._filenumber,lg._cur_time.c_str(),lg._message_info.c_str());out.write(logtxt, std::strlen(logtxt));out.close();}void FlushLog(const logmessage &lg){LockGuard lockguard(&glock);switch (_type){case SCREEN_TYPE:FulushLogToScreen(lg);break;case FILE_TYPE:FlushLogToFile(lg);break;}}void logMessage(std::string filename, int filenumber, int level, const char *format, ...){logmessage lg;lg._level = LevelToString(level);lg._id = getpid();lg._filename = filename;lg._filenumber = filenumber;lg._cur_time = GetCurrTime();va_list ap;va_start(ap, format);char log_info[1024];vsnprintf(log_info, sizeof(log_info), format, ap);va_end(ap);lg._message_info = log_info;FlushLog(lg);}private:int _type;std::string _logfile;};Log lg;
#define LOG(Level, Format, ...) \do \{ \lg.logMessage(__FILE__, __LINE__, Level, Format, ##__VA_ARGS__); \} while (0)#define EnableScreen() \do \{ \lg.Enable(SCREEN_TYPE); \} while (0)#define EnableFILE() \do \{ \lg.Enable(FILE_TYPE); \} while (0)
}
执行结果:
可以看见,每次执行的任务都是线程交替执行。日志也在我想的地方打印出来了。