C++ 实现线程池
目录
- C++ 实现线程池
- 线程池的定义
- 线程池的组成
- 整体思路
- 项目结构
- 全局互斥锁
- 任务队列
- 任务队列元素
- 任务队列类
- 任务队列头文件
- 任务队列的实现
- 线程池类
- 线程池头文件
- 构造函数
- 工作者线程
- 管理者线程
- 工作者线程的销毁
- 线程退出函数
- 线程池的实现
- 测试
- CMakeLists.txt
使用 CMake 组织
线程池的定义和组成
参考课程
源代码
线程池的定义
线程池的基本概念是,在应用程序启动时创建一定数量的线程,并将它们保存在线程池中。当需要执行任务时,从线程池中获取一个空闲的线程,将任务分配给该线程执行。当任务执行完毕后,线程将返回到线程池,可以被其他任务复用。
线程池的组成
线程池一般有三个部分组成,主要包括:
- 任务队列:即待执行的任务,在本项目中这些任务都是一些需要完成功能的函数;
- 管理线程:该线程对整个线程池中的工作线程进行管理,根据任务个数和工作线程的个数来决定是否需要创建新的工作线程或销毁工作线程;
- 工作线程:执行任务的线程,在本项目中的用处为运行任务队列中的函数;
整体思路
- 任务队列用于存储需要运行的函数,在本项目中,将函数存储在
queue
中,其中存储类型为函数指针; - 管理者线程主要对线程池中的工作线程进行创建和销毁,其依据为线程池中存活工作线程的数量以及任务数的多少;
- 工作者线程,每次该线程从工作线程中取出一个函数执行;
项目结构
.
├── CMakeLists.txt
├── main.cpp
├── TaskQueue.cpp
├── TaskQueue.h
├── ThreadPool.cpp
└── ThreadPool.h
全局互斥锁
// main.cpp
pthread_mutex_t lock;
int main() {pthread_mutex_init(&lock, nullptr);......
}
// ThreadPool.h
extern pthread_mutex_t lock;
该锁为全局变量,使用 extern 关键字;因为在整个项目完成后,在打印操作时,有时会出现有些打印打印了一半被中断进行其他打印的情况。因此使用该锁来使得打印操作原子化。
任务队列
任务队列元素
任务队列中的元素即为任务,在本项目中即为函数,此处使用函数指针进行表示,其中参数为泛型:
using callback = void(*)(void*);
template <typename T>
struct Task {Task() {function = nullptr;arg = nullptr;}Task(callback f, void* a) {function = f;arg = (T*) a;}callback function;T* arg;
};
任务队列类
那么任务队列为:
任务队列的主体即为装载任务的队列,即
std::queue<Task<T>>
该任务队列有四个成员函数:
Task<T> getTask()
获取任务,即从队头获取任务,并从队列中删除;void addTask(callback f, void* a)
和void addTask(Task<T> t)
:在队尾添加任务;int getNum()
:获取任务队列中的任务个数;
两个成员变量:
std::queue<Task<T>>
:任务队列的主体即为装载任务的队列;pthread_mutex_t m_mutex
:互斥锁,多个线程在处理任务队列中可能会发生冲突,例如:获取任务时,线程 A 读取了队头元素后,CPU 被线程 B 抢占,但是此时队头元素未被删除,就会导致队头元素被执行两次。这会导致一些错误。因此需要使用互斥锁。
template <typename T>
class TaskQueue {
public:TaskQueue();~TaskQueue();// 获取任务Task<T> getTask();// 添加任务void addTask(callback f, void* a);void addTask(Task<T> t);// 获取任务个数inline int getNum() {return m_queue.size();}
private:// 队列std::queue<Task<T>> m_queue;// 互斥锁pthread_mutex_t m_mutex;
};
任务队列头文件
// 任务队列 TaskQueue.h
#ifndef _TASKQUEUE_H_
#define _TASKQUEUE_H_
#include <queue>
#include <pthread.h>using callback = void(*)(void*);
template <typename T>
struct Task {Task() {function = nullptr;arg = nullptr;}Task(callback f, void* a) {function = f;arg = (T*) a;}callback function;T* arg;
};template <typename T>
class TaskQueue {
public:TaskQueue();~TaskQueue();// 获取任务Task<T> getTask();// 添加任务void addTask(callback f, void* a);void addTask(Task<T> t);// 获取任务个数inline int getNum() {return m_queue.size();}
private:// 队列std::queue<Task<T>> m_queue;// 互斥锁pthread_mutex_t m_mutex;
};#endif // _TASKQUEUE_H_
任务队列的实现
// TaskQueue.cpp
#include "TaskQueue.h"template <typename T>
TaskQueue<T>::TaskQueue()
{pthread_mutex_init(&m_mutex, nullptr);
}template <typename T>
TaskQueue<T>::~TaskQueue()
{pthread_mutex_destroy(&m_mutex);
}template <typename T>
Task<T> TaskQueue<T>::getTask()
{Task<T> t;pthread_mutex_lock(&m_mutex);t = m_queue.front();m_queue.pop();pthread_mutex_unlock(&m_mutex);return t;
}template <typename T>
void TaskQueue<T>::addTask(callback f, void *a)
{Task t(f, a);this->addTask(t);
}template <typename T>
void TaskQueue<T>::addTask(Task<T> t)
{pthread_mutex_lock(&m_mutex);m_queue.push(t);pthread_mutex_unlock(&m_mutex);
}
线程池类
其中成员变量如下:
int m_maxNum; int m_minNum;
:线程池所拥有工作线程的上下限;int m_aliveNum;
:表示目前线程池中的线程个数;int m_busyNum;
:正在执行任务的线程数;int m_exitNum
:线程池目前需要销毁多少个线程,这是在管理者线程中进行控制,管理者线程会根据线程池中的线程数量和任务队列的大小来决定是否需要销毁线程池中的线程,以及销毁多少个;bool m_shutdown=false
:每次管理者线程和工作者线程运行时都会检查该变量,决定线程是否结束运行,通过该变量来销毁线程池中所有的线程;pthread_mutex_t m_lock
:由于线程池是多线程共同操作的,因此为了避免冲突,使用了互斥锁;pthread_cond_t m_notEmpty
:条件变量,当任务队列为空时,用于阻塞活跃的工作进程;当新增任务后,由会使用该条件变量唤醒被阻塞的工作进程;pthread_t* m_threadIDs
:工作线程数组,每个工作线程的线程 id 被保存到该数组中;pthread_tm_managerID;
:管理者线程 ID;TaskQueue<T>*m_taskQ
:任务队列;
该类的定义在头文件 ThreadPool.h
中:
线程池头文件
template <typename T>
class ThreadPool {
public:ThreadPool(int min, int max);~ThreadPool();void addTask(Task<T> task);int getBusyNum();int getAliveNum();void ThreadExit();private:static void* worker(void* arg);static void* manager(void* arg);private:// 互斥锁pthread_mutex_t m_lock;// 任务队列为空的条件变量pthread_cond_t m_notEmpty;// 工作线程数组pthread_t* m_threadIDs;// 管理者线程数组pthread_t m_managerID;// 任务队列TaskQueue<T>* m_taskQ;// 线程数上下限int m_maxNum;int m_minNum;// 现有的线程数int m_aliveNum;// 在忙的线程数int m_busyNum;// 需要销毁的线程数int m_exitNum;// 是否销毁线程池bool m_shutdown = false;
};
对该类中函数的解释分为两个部分,即静态成员函数和成员函数;
构造函数
四个主要部分:
- 为线程池中的变量赋初值,包括线程数的上下限等;
- 为工作线程数组分配空间;
- 初始化互斥锁和条件变量;
- 创建 min 个工作线程;那么初始状态即拥有
alive=min
个存活线程数;
template <typename T>
ThreadPool<T>::ThreadPool(int min, int max)
{m_taskQ = new TaskQueue<T>;m_minNum = min;m_maxNum = max;m_busyNum = 0;m_aliveNum = min;m_threadIDs = new pthread_t[max];memset(m_threadIDs, 0, sizeof(pthread_t) * max);if (pthread_mutex_init(&m_lock, nullptr) != 0 || pthread_cond_init(&m_notEmpty, nullptr) != 0) {std::cout << "mutex or cond init fail..." << std::endl;}for (int i = 0; i < min; i++) {pthread_create(&m_threadIDs[i], nullptr, worker, this);std::cout << "创建子线程,ID = " << std::to_string(m_threadIDs[i]) << std::endl;}pthread_create(&m_managerID, nullptr, manager, this);
}
工作者线程
其主要工作为从任务队列中取出任务并执行;
主要为三个过程:
- 检查任务队列是否为空,若为空则使用条件变量阻塞当前进程;
- 检查线程池是否关闭,若关闭则退出当前进程;
- 若任务队列不为空,线程池未关闭,则取出任务开始执行;
注意:只要涉及到线程池内部变量操作的都加上了互斥锁,在退出线程前,必须对锁进行释放,否则会导致死锁
// 工作者线程,从任务队列中取出任务进行执行
template <typename T>
void *ThreadPool<T>::worker(void *arg)
{ThreadPool* pool = static_cast<ThreadPool*>(arg);while (true) {pthread_mutex_lock(&pool->m_lock);// 当任务队列为空时进行阻塞while (pool->m_taskQ->getNum() == 0 && !pool->m_shutdown) {std::cout << "thread " << std::to_string(pthread_self()) << " waiting..." << std::endl;pthread_cond_wait(&pool->m_notEmpty, &pool->m_lock);// 唤醒后if (pool->m_exitNum > 0) {pool->m_exitNum--;if (pool->m_aliveNum > pool->m_minNum) {pool->m_aliveNum--;pthread_mutex_unlock(&pool->m_lock);pool->ThreadExit();}}}// 若线程池关闭则退出当前线程if (pool->m_shutdown) {pthread_mutex_unlock(&pool->m_lock);pool->ThreadExit();}// 任务队列不为空时Task task = pool->m_taskQ->getTask();pool->m_busyNum++;pthread_mutex_unlock(&pool->m_lock);// 执行任务pthread_mutex_lock(&lock);std::cout << "thread " << std::to_string(pthread_self()) << " start working" << std::endl;pthread_mutex_unlock(&lock);task.function(task.arg);delete task.arg;task.arg = nullptr;// 任务结束pthread_mutex_lock(&lock);std::cout << "thread " << std::to_string(pthread_self()) << " end working..." << std::endl;pthread_mutex_unlock(&lock);pthread_mutex_lock(&pool->m_lock);pool->m_busyNum--;pthread_mutex_unlock(&pool->m_lock);}return nullptr;
}
管理者线程
管理者线程每隔一段时间获取线程池中的线程数和任务数,根据这两个数据来决定创建还是销毁线程;
三个过程:
- 获取线程池中的数据,包括存活的线程数,任务数,忙碌的线程;
- 若线程数不够,即存活的线程数小于任务数,则在不超过最大值的情况下创建线程;
- 若线程数过量,即忙碌线程*2数量小于存活线程,说明线程过多,需要销毁;
// 管理者线程,每隔一段时间读取线程池中的数据对线程进行生成和销毁
template <typename T>
void *ThreadPool<T>::manager(void *arg)
{ThreadPool* pool = static_cast<ThreadPool*>(arg);while (!pool->m_shutdown) {sleep(5);// 取出线程数和任务数pthread_mutex_lock(&pool->m_lock);int aliveNum = pool->m_aliveNum;int taskNum = pool->m_taskQ->getNum();int busyNum = pool->m_busyNum;pthread_mutex_unlock(&pool->m_lock);const int NUMBER = 2;// 若线程数不够if (aliveNum < taskNum && aliveNum < pool->m_maxNum) {pthread_mutex_lock(&pool->m_lock);int num = 0;// 每次增加 NUMBER 个,线程数不能大于最大线程数,线程 id 必须在范围内for (int i = 0; num < NUMBER && pool->m_aliveNum < pool->m_maxNum &&i < pool->m_maxNum; i++){// 对未创建的线程进行创建if (pool->m_threadIDs[i] == 0) {pthread_create(&pool->m_threadIDs[i], nullptr, worker, pool);num++;pool->m_aliveNum++;}}pthread_mutex_unlock(&pool->m_lock);}// 若线程数过多,忙碌线程数 * 2 小于 存活线程数if (busyNum * 2 < aliveNum && aliveNum > pool->m_minNum) {pthread_mutex_lock(&pool->m_lock);// 需要销毁的线程数pool->m_exitNum = NUMBER;pthread_mutex_unlock(&pool->m_lock);// 唤醒等量的线程让其自动销毁for (int i = 0; i < NUMBER; i++) {pthread_cond_signal(&pool->m_notEmpty);}}}return nullptr;
}
工作者线程的销毁
注意销毁的实现是通过以下代码的配合:
// manager
// 若线程数过多,忙碌线程数 * 2 小于 存活线程数
if (busyNum * 2 < aliveNum && aliveNum > pool->m_minNum) {pthread_mutex_lock(&pool->m_lock);// 需要销毁的线程数pool->m_exitNum = NUMBER;pthread_mutex_unlock(&pool->m_lock);// 唤醒等量的线程让其自动销毁for (int i = 0; i < NUMBER; i++) {pthread_cond_signal(&pool->m_notEmpty);}
}
若需要销毁线程,在对 exitNum
赋值后唤醒工作进程;
pthread_cond_wait(&pool->m_notEmpty, &pool->m_lock);
// 唤醒后
if (pool->m_exitNum > 0) {pool->m_exitNum--;if (pool->m_aliveNum > pool->m_minNum) {pool->m_aliveNum--;pthread_mutex_unlock(&pool->m_lock);pool->ThreadExit();}
}
工作进程被唤醒后将会执行以上代码;
线程退出函数
两个主要任务
- 工作者线程数组中的值进行归零;
- 退出当前线程;
template <typename T>
void ThreadPool<T>::ThreadExit()
{pthread_t tid = pthread_self();for (int i = 0; i < m_maxNum; i++) {if (m_threadIDs[i] == tid) {std::cout << "threadExit() function: thread " << std::to_string(pthread_self()) << " exiting..." << std::endl;m_threadIDs[i] = 0;break;}}pthread_exit(&tid);
}
其他函数较为简单,此处不再赘述。
线程池的实现
// ThreadPool.cpp
#include "ThreadPool.h"
#include <string.h>
#include <iostream>
#include <string>
#include <unistd.h>template <typename T>
ThreadPool<T>::ThreadPool(int min, int max)
{m_taskQ = new TaskQueue<T>;m_minNum = min;m_maxNum = max;m_busyNum = 0;m_aliveNum = min;m_threadIDs = new pthread_t[max];memset(m_threadIDs, 0, sizeof(pthread_t) * max);if (pthread_mutex_init(&m_lock, nullptr) != 0 || pthread_cond_init(&m_notEmpty, nullptr) != 0) {std::cout << "mutex or cond init fail..." << std::endl;}for (int i = 0; i < min; i++) {pthread_create(&m_threadIDs[i], nullptr, worker, this);std::cout << "创建子线程,ID = " << std::to_string(m_threadIDs[i]) << std::endl;}pthread_create(&m_managerID, nullptr, manager, this);
}template <typename T>
ThreadPool<T>::~ThreadPool()
{m_shutdown = true;// 销毁管理者线程pthread_join(m_managerID, nullptr);// 销毁管理者线程for (int i = 0; i < m_aliveNum; i++) {pthread_cond_signal(&m_notEmpty);}if (m_taskQ) delete m_taskQ;if (m_threadIDs) delete[] m_threadIDs;pthread_mutex_destroy(&m_lock);pthread_cond_destroy(&m_notEmpty);
}template <typename T>
void ThreadPool<T>::addTask(Task<T> task)
{if (m_shutdown) {return;}// 添加时不用枷锁,添加函数中有锁m_taskQ->addTask(task);// 唤醒工作线程pthread_cond_signal(&m_notEmpty);
}template <typename T>
int ThreadPool<T>::getBusyNum()
{pthread_mutex_lock(&m_lock);int num = m_busyNum;pthread_mutex_unlock(&m_lock);return num;
}template <typename T>
int ThreadPool<T>::getAliveNum()
{pthread_mutex_lock(&m_lock);int num = m_aliveNum;pthread_mutex_unlock(&m_lock);return 0;
}template <typename T>
void ThreadPool<T>::ThreadExit()
{pthread_t tid = pthread_self();for (int i = 0; i < m_maxNum; i++) {if (m_threadIDs[i] == tid) {std::cout << "threadExit() function: thread " << std::to_string(pthread_self()) << " exiting..." << std::endl;m_threadIDs[i] = 0;break;}}pthread_exit(&tid);
}// 工作者线程,从任务队列中取出任务进行执行
template <typename T>
void *ThreadPool<T>::worker(void *arg)
{ThreadPool* pool = static_cast<ThreadPool*>(arg);while (true) {pthread_mutex_lock(&pool->m_lock);// 当任务队列为空时进行阻塞while (pool->m_taskQ->getNum() == 0 && !pool->m_shutdown) {std::cout << "thread " << std::to_string(pthread_self()) << " waiting..." << std::endl;pthread_cond_wait(&pool->m_notEmpty, &pool->m_lock);// 唤醒后if (pool->m_exitNum > 0) {pool->m_exitNum--;if (pool->m_aliveNum > pool->m_minNum) {pool->m_aliveNum--;pthread_mutex_unlock(&pool->m_lock);pool->ThreadExit();}}}// 若线程池关闭则退出当前线程if (pool->m_shutdown) {pthread_mutex_unlock(&pool->m_lock);pool->ThreadExit();}// 任务队列不为空时Task task = pool->m_taskQ->getTask();pool->m_busyNum++;pthread_mutex_unlock(&pool->m_lock);// 执行任务pthread_mutex_lock(&lock);std::cout << "thread " << std::to_string(pthread_self()) << " start working" << std::endl;pthread_mutex_unlock(&lock);task.function(task.arg);delete task.arg;task.arg = nullptr;// 任务结束pthread_mutex_lock(&lock);std::cout << "thread " << std::to_string(pthread_self()) << " end working..." << std::endl;pthread_mutex_unlock(&lock);pthread_mutex_lock(&pool->m_lock);pool->m_busyNum--;pthread_mutex_unlock(&pool->m_lock);}return nullptr;
}// 管理者线程,每隔一段时间读取线程池中的数据对线程进行生成和销毁
template <typename T>
void *ThreadPool<T>::manager(void *arg)
{ThreadPool* pool = static_cast<ThreadPool*>(arg);while (!pool->m_shutdown) {sleep(5);// 取出线程数和任务数pthread_mutex_lock(&pool->m_lock);int aliveNum = pool->m_aliveNum;int taskNum = pool->m_taskQ->getNum();int busyNum = pool->m_busyNum;pthread_mutex_unlock(&pool->m_lock);const int NUMBER = 2;// 若线程数不够if (aliveNum < taskNum && aliveNum < pool->m_maxNum) {pthread_mutex_lock(&pool->m_lock);int num = 0;// 每次增加 NUMBER 个,线程数不能大于最大线程数,线程 id 必须在范围内for (int i = 0; num < NUMBER && pool->m_aliveNum < pool->m_maxNum &&i < pool->m_maxNum; i++){// 对未创建的线程进行创建if (pool->m_threadIDs[i] == 0) {pthread_create(&pool->m_threadIDs[i], nullptr, worker, pool);num++;pool->m_aliveNum++;}}pthread_mutex_unlock(&pool->m_lock);}// 若线程数过多,忙碌线程数 * 2 小于 存活线程数if (busyNum * 2 < aliveNum && aliveNum > pool->m_minNum) {pthread_mutex_lock(&pool->m_lock);// 需要销毁的线程数pool->m_exitNum = NUMBER;pthread_mutex_unlock(&pool->m_lock);// 唤醒等量的线程让其自动销毁for (int i = 0; i < NUMBER; i++) {pthread_cond_signal(&pool->m_notEmpty);}}}return nullptr;
}
测试
#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <string.h>
#include "ThreadPool.h"
#include "ThreadPool.cpp"pthread_mutex_t lock;void taskFunc(void* arg) {int* num = static_cast<int*>(arg);pthread_mutex_lock(&lock);std::cout << "thread " << pthread_self() << " is working, number = " << *num << std::endl;pthread_mutex_unlock(&lock);sleep(1);
}int main() {pthread_mutex_init(&lock, nullptr);ThreadPool<int> pool(3, 10);for (int i = 0; i < 100; i++) {int* num = new int(i + 100);pool.addTask(Task<int>(taskFunc, num));}sleep(20);pthread_mutex_destroy(&lock);return 0;
}
结果:
创建子线程,ID = 140560222742080
thread 140560222742080 waiting...
创建子线程,ID = 140560214349376
创建子线程,ID = 140560205956672
thread 140560222742080 start working
thread 140560222742080 is working, number = 100
thread 140560205956672 start working
thread 140560205956672 is working, number = 101
thread 140560214349376 start working
thread 140560214349376 is working, number = 102
thread 140560222742080 end working...
......
thread 140559977014848 is working, number = 198
thread 140560205956672 end working...
thread 140560205956672 start working
thread 140560205956672 is working, number = 199
thread 140560222742080 end working...
thread 140560222742080 waiting...
thread 140560214349376 end working...
thread 140560214349376 waiting...
thread 140559951836736 end working...
thread 140559951836736 waiting...
thread 140559943444032 end working...
thread 140559943444032 waiting...
thread 140559968622144 end working...
thread 140559968622144 waiting...
thread 140559960229440 end working...
thread 140559960229440 waiting...
thread 140559985407552 end working...
thread 140559985407552 waiting...
thread 140559977014848 end working...
thread 140559977014848 waiting...
thread 140560205956672 end working...
thread 140560205956672 waiting...
threadExit() function: thread 140560222742080 exiting...
threadExit() function: thread 140559960229440 exiting...
threadExit() function: thread 140560214349376 exiting...
threadExit() function: thread 140559943444032 exiting...
threadExit() function: thread 140559985407552 exiting...
threadExit() function: thread 140559951836736 exiting...
threadExit() function: thread 140559968622144 exiting...
threadExit() function: thread 140559977014848 exiting...
threadExit() function: thread 140560205956672 exiting...
CMakeLists.txt
cmake_minimum_required(VERSION 3.0)
project(POOL)
file(GLOB SRC ${CMAKE_CURRENT_SOURCE_DIR}/*.cpp)
add_executable(app ${SRC})