您的位置:首页 > 健康 > 美食 > TinyWebSever源码逐行注释(三)_ thread_pool.cpp

TinyWebSever源码逐行注释(三)_ thread_pool.cpp

2024/10/5 20:21:48 来源:https://blog.csdn.net/qq_39969848/article/details/142062686  浏览:    关键词:TinyWebSever源码逐行注释(三)_ thread_pool.cpp

前言

项目源码地址
项目详细介绍

项目简介:
Linux下C++轻量级Web服务器,助力初学者快速实践网络编程,搭建属于自己的服务器.

  1. 使用 线程池 + 非阻塞socket + epoll(ET和LT均实现) + 事件处理(Reactor和模拟Proactor均实现) 的并发模型
  2. 使用状态机解析HTTP请求报文,支持解析GET和POST请求
  3. 访问服务器数据库实现web端用户注册、登录功能,可以请求服务器图片和视频文件
  4. 实现同步/异步日志系统,记录服务器运行状态
  5. 经Webbench压力测试可以实现上万的并发连接数据交换

thread_pool.cpp用于配置web服务器的线程池,使用一个工作队列完全解除了主线程和工作线程的耦合关系:主线程往工作队列中插入任务,工作线程通过竞争来取得任务并执行它。主要内容如下:

  • 同步I/O模拟proactor模式
  • 半同步/半反应堆
  • 线程池

原项目地址的注释较少不适合初学者,于是我将每行都加上了注释帮助大家更好的理解:

#ifndef THREADPOOL_H
#define THREADPOOL_H#include <list>
#include <cstdio>
#include <exception>
#include <pthread.h>
#include "../lock/locker.h" // 包含自定义的锁机制(互斥锁和信号量)
#include "../CGImysql/sql_connection_pool.h" // 数据库连接池类,用于处理数据库连接// 定义一个线程池类,T是模板类型,表示任务的类型
template <typename T>
class threadpool
{
public:/* * 构造函数,初始化线程池 * actor_model 表示工作模式,connPool 是数据库连接池,thread_number 是线程池中的线程数,* max_request 是最大允许的请求队列长度*/threadpool(int actor_model, connection_pool *connPool, int thread_number = 8, int max_request = 10000);// 析构函数,释放线程池资源~threadpool();// 将新的请求任务添加到队列中,附带状态bool append(T *request, int state);// 将新的请求任务添加到队列中,不附带状态bool append_p(T *request);private:/* 工作线程运行的函数,它会不断从工作队列中取任务执行 */static void *worker(void *arg);// 实际处理任务的函数,从任务队列中取出任务并处理void run();private:int m_thread_number;        // 线程池中的线程数int m_max_requests;         // 请求队列中允许的最大请求数pthread_t *m_threads;       // 描述线程池的数组,其大小为 m_thread_numberstd::list<T *> m_workqueue; // 请求队列,用于存储需要处理的任务locker m_queuelocker;       // 保护请求队列的互斥锁,避免多线程同时访问时产生竞态条件sem m_queuestat;            // 信号量,表示是否有任务需要处理connection_pool *m_connPool;  // 数据库连接池,用于任务处理时的数据库操作int m_actor_model;          // 模型切换,表示不同的处理模式
};// 构造函数,初始化线程池
template <typename T>
threadpool<T>::threadpool(int actor_model, connection_pool *connPool, int thread_number, int max_requests): m_actor_model(actor_model), m_thread_number(thread_number), m_max_requests(max_requests), m_threads(NULL), m_connPool(connPool)
{// 如果线程数或请求数不合法,抛出异常if (thread_number <= 0 || max_requests <= 0)throw std::exception();// 动态分配线程数组m_threads = new pthread_t[m_thread_number];if (!m_threads)throw std::exception();// 循环创建线程,每个线程调用 worker 函数来执行任务for (int i = 0; i < thread_number; ++i){// 创建线程,worker 是线程的入口函数if (pthread_create(m_threads + i, NULL, worker, this) != 0){delete[] m_threads; // 创建线程失败,清理已分配的资源throw std::exception();}// 线程分离模式,线程结束后自动释放资源if (pthread_detach(m_threads[i])){delete[] m_threads;throw std::exception();}}
}// 析构函数,释放线程数组的资源
template <typename T>
threadpool<T>::~threadpool()
{delete[] m_threads;
}// 将请求任务加入请求队列,并指定任务的状态(读或写)
template <typename T>
bool threadpool<T>::append(T *request, int state)
{// 加锁,确保对队列的操作是线程安全的m_queuelocker.lock();// 如果请求队列已满,解锁并返回 falseif (m_workqueue.size() >= m_max_requests){m_queuelocker.unlock();return false;}// 设置请求的状态request->m_state = state;// 将请求添加到队列的末尾m_workqueue.push_back(request);// 解锁m_queuelocker.unlock();// 通知有新任务要处理m_queuestat.post();return true;
}// 将请求任务加入请求队列,不指定状态
template <typename T>
bool threadpool<T>::append_p(T *request)
{// 加锁,确保对队列的操作是线程安全的m_queuelocker.lock();// 如果请求队列已满,解锁并返回 falseif (m_workqueue.size() >= m_max_requests){m_queuelocker.unlock();return false;}// 将请求添加到队列的末尾m_workqueue.push_back(request);// 解锁m_queuelocker.unlock();// 通知有新任务要处理m_queuestat.post();return true;
}// 线程入口函数,线程从任务队列中取任务处理
template <typename T>
void *threadpool<T>::worker(void *arg)
{// 将传入的参数转换为线程池对象threadpool *pool = (threadpool *)arg;// 调用线程池的 run 函数,执行任务pool->run();return pool;
}// 处理任务的主函数,循环从请求队列中取任务并处理
template <typename T>
void threadpool<T>::run()
{while (true){// 等待有任务到来,信号量阻塞线程m_queuestat.wait();// 加锁以安全访问请求队列m_queuelocker.lock();// 如果队列为空,解锁并继续等待if (m_workqueue.empty()){m_queuelocker.unlock();continue;}// 从队列头取出一个请求T *request = m_workqueue.front();// 移除取出的请求m_workqueue.pop_front();// 解锁m_queuelocker.unlock();// 如果请求为空,继续处理下一个请求if (!request)continue;// 根据不同的 actor 模型处理请求if (1 == m_actor_model){// 读操作if (0 == request->m_state){// 调用 read_once 尝试读取数据if (request->read_once()){// 数据读取成功,标记为需要进一步处理request->improv = 1;// 使用数据库连接池处理数据库相关任务connectionRAII mysqlcon(&request->mysql, m_connPool);// 处理请求request->process();}else{// 数据读取失败,设置定时器标志request->improv = 1;request->timer_flag = 1;}}else{// 写操作if (request->write()){// 数据写入成功,标记为处理完成request->improv = 1;}else{// 数据写入失败,设置定时器标志request->improv = 1;request->timer_flag = 1;}}}else{// 如果使用另一种模型,不区分读写,直接处理connectionRAII mysqlcon(&request->mysql, m_connPool);request->process();}}
}
#endif

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com