您的位置:首页 > 科技 > 能源 > 今天广西紧急通知最新_网上推广产品哪个平台效果好_seo快速优化技术_下载百度

今天广西紧急通知最新_网上推广产品哪个平台效果好_seo快速优化技术_下载百度

2025/4/22 17:31:46 来源:https://blog.csdn.net/weixin_74792326/article/details/147278537  浏览:    关键词:今天广西紧急通知最新_网上推广产品哪个平台效果好_seo快速优化技术_下载百度
今天广西紧急通知最新_网上推广产品哪个平台效果好_seo快速优化技术_下载百度

目录

SERVER服务器模块实现:

1、Buffer模块:缓冲区模块

2、套接字Socket类实现: 

3、事件管理Channel类实现:

4、 描述符事件监控Poller类实现:

5、定时任务管理TimerWheel类实现:

eventfd 

6、Reactor-EventLoop线程池类实现:


SERVER服务器模块实现:

1、Buffer模块:缓冲区模块

提供的功能:存储数据,取出数据

实现思想:

1、实现缓冲区得有一块内存空间,采取vector<char>

vector底层其实使用的就是一个线性的内存空间

2、要素:

        a、默认的空间大小

        b、当前的读取数据数据位置

        c、当前的写入数据位置

3、操作

        a、写入数据:

        当前写入位置指向哪里,就从哪里开始写

        如果后面剩余空间不够了

        1、考虑整体缓冲区空闲空间是否足够(因为读位置也会向后偏移,前面有可能会有空闲空间)

        足够:将数据移动到起始位置即可

        不够:给数组vector扩容

数据一旦写入成功,当前位置,就要向后移动

        2、读取数据:

        当前的读取位置指向哪里,就从哪里开始读取,前提是有数据可读

        可读数据大小:当前写入位置减去当前读取位置

实现缓冲区类,该如何设计:
class   Buffer{

private:

        std::vector<char>  _buffer;

        //位置,是一个相对偏移量,而不是绝对地址

        uint64_t  _read_idx;//相对写偏移量

        uint64_t  _weite_idx;//相对读偏移量

public:

        1、获取当前写位置地址

        2、确保可写空间足够(移动+ 扩容)

        3、获取前沿空闲空间大小

        4、获取后沿空闲空间大小

        5、将写位置向后移动指定长度

        6、获取当前读位置的地址

        7、获取可读空间大小

        8、将读位置向后移动指定长度

        9、清理功能

代码实现: 

#include <iostream>
#include <vector>
#include <string>
#include <cassert>
#include <cstring>
#define BUFFER_DEFAULT_SIZE 1024
class Buffer
{private:std::vector<char> _buffer;//使用vector进行内存空间管理uint64_t _reader_idx;//读偏移uint64_t _writer_idx; //写偏移public:Buffer():_reader_idx(0),_writer_idx(0),_buffer(BUFFER_DEFAULT_SIZE){}char *Begin(){return &*_buffer.begin();} //获取起始位置//获取当前写入起始位置   //buffer空间起始地址+写偏移量char *WritePosition(){ return Begin() + _writer_idx;}//获取当前读取起始位置char *ReadPosition(){return Begin() + _reader_idx;}//获取缓冲区末尾空闲空间大小 --- 写偏移之后的空闲空间----总体空间大小减去写偏移uint64_t TailIdleSize() {return _buffer.size() - _writer_idx;}//获取缓冲区起始空闲空闲空间大小 --- 读偏移之前的空闲时间uint64_t HeadIdleSize() {return _reader_idx;}//获取可读数据大小uint64_t ReadAbleSize() {return _writer_idx - _reader_idx;}//将读偏移向后移动void MoveReadOffset(uint64_t len) { if(len == 0) return;//向后移动得大小必须小于可读数据得大小assert(_reader_idx + len <= ReadAbleSize());_reader_idx += len;}//将写偏移向后移动void MoveWriteOffset(uint64_t len){//向后移动得大小,必须小于当前后边的空闲空间大小assert(len <= TailIdleSize());_writer_idx += len;}//确保可写空间足够(整体空闲空间够了就移动,不够就扩容)void EnsureWriteSpace(uint64_t len){//如果末尾空间大小足够,直接返回if(TailIdleSize() >= len) return;//末尾空闲空间不够,则加上起始空间大小是否足够if(len <= TailIdleSize() + HeadIdleSize()){//够了,将数据移动到起始位置uint64_t rsz = ReadAbleSize();//把当前数据大小先保存起来std::copy(ReadPosition(),ReadPosition() + rsz,Begin());//把可读数据拷贝到起始位置_reader_idx = 0;//将读偏移归0_writer_idx = rsz;//将写位置置为可读数据大小,因为当前的可读数据大小就是写偏移量}else{//总体空间不够,则需要扩容,不移动数据,直接给写偏移之后扩容足够空间即可_buffer.resize(_writer_idx + len);}}//写入数据 void Write(const void *data,uint64_t len){//1、保证有足够空间 2、拷贝数据进去if(len == 0) return;EnsureWriteSpace(len);const char *d = (const char *)data;std::copy(d, d + len, WritePosition());}void WriteAndPush(const void *data, uint64_t len){Write(data,len);MoveWriteOffset(len);}void WriteString(const std::string &data){return  Write(data.c_str(), data.size());}void WriteStringAndPush(const std::string &data){WriteString(data);MoveWriteOffset(data.size());}void WriteBuffer(Buffer &data){Write(data.ReadPosition(), data.ReadAbleSize());}void WriteBufferAndPush(Buffer &data){WriteBuffer(data);MoveWriteOffset(data.ReadAbleSize());}//读取数据void Read(void *buf, uint64_t len){//要求要获取的数据大小必须小于可读数据大小assert(len <= ReadAbleSize());std::copy(ReadPosition(),ReadPosition() + len, (char*)buf);}void readAndPop(void *buf, uint64_t len){Read(buf, len); //读数据MoveReadOffset(len);//指针向后移动}std::string ReadAsString(uint64_t len){assert(len <= ReadAbleSize());std::string str;str.resize(len);Read(&str[0], len);return str;}std::string ReadAsStringAndPop(uint64_t len){std::string str = ReadAsString(len); //读数据MoveReadOffset(len);//指针向后移动return str;}char *FindCRLF() //查找换行符{char *res = (char*)memchr(ReadPosition(),'\n', ReadAbleSize());return res;}std::string GetLine() //取出一行{char *pos = FindCRLF();if(pos == NULL){return "";}//+1是为了把换行符也取出来return ReadAsString(pos - ReadPosition() + 1);}std::string GetLineAndPop(){std::string str = GetLine();MoveReadOffset(str.size());}//清空缓冲区void Clear(){//只需要将偏移量归0 覆盖写即可_reader_idx = 0;_writer_idx = 0; }
};

2、套接字Socket类实现: 

  • 创建套接字

  • 绑定地址信息

  • 开始监听

  • 向服务器发起连接

  • 获取新连接

  • 接收数据

  • 发送数据

  • 关闭套接字

  • 创建一个服务端连接

  • 创建一个客户端连接

  • 设置套接字选项---开启地址端口重用 一个连接绑定端口和地址之后,一旦主动断开连接他就会进入timewait保护状态,套接字并不会立即被释放,所以ip地址和端口号就依然被占用,无法立即使用它。在服务器的使用中就会造成服务器一旦出了问题,就会无法立即启动,因此要开启地址和端口重用。

  • 设置套接字的阻塞属性---设置为非阻塞  阻塞是当缓冲区中没有数据了套接字就一直等,程序就无法向后执行,因此要设置为非阻塞

代码实现:

#define MAX_LISTEN 1024
class Socket {private:int _sockfd;public:Socket():_sockfd(-1) {}Socket(int fd): _sockfd(fd) {}~Socket() { Close(); }int Fd() { return _sockfd; }//创建套接字bool Create() { //因为创建套接字可能会失败,失败之后如何处理由使用者来决定// int socket(int domain, int type, int protocol)_sockfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);//ipv4、流式套接字、tcp协议(协议类型)if (_sockfd < 0) {ERR_LOG("CREATE SOCKET FAILED!!");return false;}return true;}//绑定地址信息bool Bind(const std::string &ip, uint16_t port) { //要告诉绑定什么struct sockaddr_in addr; //组织地址结构addr.sin_family = AF_INET;addr.sin_port = htons(port);addr.sin_addr.s_addr = inet_addr(ip.c_str());socklen_t len = sizeof(struct sockaddr_in);// int bind(int sockfd, struct sockaddr*addr, socklen_t len);int ret = bind(_sockfd, (struct sockaddr*)&addr, len);if (ret < 0) {ERR_LOG("BIND ADDRESS FAILED!");return false;}return true;}//开始监听bool Listen(int backlog = MAX_LISTEN) { //MAX_LISTEN同一时间最大并发连接数// int listen(int backlog)int ret = listen(_sockfd, backlog);//将一个套接字的状态设置为listen状态,并设置同一时间最大连接数if (ret < 0) {ERR_LOG("SOCKET LISTEN FAILED!");return false;}return true;}//向服务器发起连接bool Connect(const std::string &ip, uint16_t port) {struct sockaddr_in addr;addr.sin_family = AF_INET;addr.sin_port = htons(port);addr.sin_addr.s_addr = inet_addr(ip.c_str());socklen_t len = sizeof(struct sockaddr_in);// int connect(int sockfd, struct sockaddr*addr, socklen_t len);int ret = connect(_sockfd, (struct sockaddr*)&addr, len);if (ret < 0) {ERR_LOG("CONNECT SERVER FAILED!");return false;}return true;}//获取新连接int Accept() { //获取新连接的描述符返回// int accept(int sockfd, struct sockaddr *addr, socklen_t *len);int newfd = accept(_sockfd, NULL, NULL);//通过监听套接字获取一个新建连接的描述符并且返回当前连接上的客户端的地址信息(但是这些地址信息并没有用到,因此只需要获取新的描述符即可)if (newfd < 0) {ERR_LOG("SOCKET ACCEPT FAILED!");return -1;}return newfd;}//接收数据ssize_t Recv(void *buf, size_t len, int flag = 0) { //有符号长整型// ssize_t recv(int sockfd, void *buf, size_t len, int flag);ssize_t ret = recv(_sockfd, buf, len, flag);if (ret <= 0) {//EAGAIN 当前socket的接收缓冲区中没有数据了,在非阻塞的情况下才会有这个错误//EINTR  表示当前socket的阻塞等待,被信号打断了,if (errno == EAGAIN || errno == EINTR) {return 0;//表示这次接收没有接收到数据}ERR_LOG("SOCKET RECV FAILED!!");return -1;}return ret; //实际接收的数据长度}ssize_t NonBlockRecv(void *buf, size_t len) { //非阻塞接收数据return Recv(buf, len, MSG_DONTWAIT); // MSG_DONTWAIT 表示当前接收为非阻塞。}//发送数据ssize_t Send(const void *buf, size_t len, int flag = 0) {// ssize_t send(int sockfd, void *data, size_t len, int flag);ssize_t ret = send(_sockfd, buf, len, flag);if (ret < 0) {if (errno == EAGAIN || errno == EINTR) {return 0;}ERR_LOG("SOCKET SEND FAILED!!");return -1;}return ret;//实际发送的数据长度}ssize_t NonBlockSend(void *buf, size_t len) { //非阻塞发送数据if (len == 0) return 0;return Send(buf, len, MSG_DONTWAIT); // MSG_DONTWAIT 表示当前发送为非阻塞。}//关闭套接字void Close() {if (_sockfd != -1) {close(_sockfd);_sockfd = -1;}}//创建一个服务端连接bool CreateServer(uint16_t port, const std::string &ip = "0.0.0.0", bool block_flag = false) {//1. 创建套接字,2. 绑定地址,3. 开始监听,4. 设置非阻塞, 5. 启动地址重用if (Create() == false) return false;if (block_flag) NonBlock();//开启非阻塞if (Bind(ip, port) == false) return false;if (Listen() == false) return false;ReuseAddress(); //开启地址重用return true;}//创建一个客户端连接bool CreateClient(uint16_t port, const std::string &ip) {//1. 创建套接字,2.指向连接服务器if (Create() == false) return false;if (Connect(ip, port) == false) return false; //给的是服务器的ip地址和端口号return true;}//设置套接字选项---开启地址端口重用void ReuseAddress() {// int setsockopt(int fd, int leve, int optname, void *val, int vallen)int val = 1;setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, (void*)&val, sizeof(int));//第二个参数 表示选项所在的协议层 SOL_SOCKET 代表套接字层//SO_REUSEADDR 允许在绑定地址时,即使该地址已被占用,//只要原占用的套接字处于 TIME_WAIT 状态,新的套接字也能绑定该地址。val = 1;setsockopt(_sockfd, SOL_SOCKET, SO_REUSEPORT, (void*)&val, sizeof(int));//SO_REUSEPORT:允许多个套接字绑定到相同的地址和端口,不过前提是这些套接字都设置了该选项。//这在负载均衡和多线程 / 多进程网络编程中非常有用。}//设置套接字阻塞属性-- 设置为非阻塞void NonBlock() {//int fcntl(int fd, int cmd, ... /* arg */ );int flag = fcntl(_sockfd, F_GETFL, 0);fcntl(_sockfd, F_SETFL, flag | O_NONBLOCK);}
};

3、事件管理Channel类实现:

 事件触发后的处理的管理:
1、需要处理的事件:可读、可写、挂断、错误、任意

2、事件处理回调函数

成员:因为后边使用epoll进行事件监控

EPOLLIN      可读

EPOLLOUT  可写

EPOLLRDHUP  连接断开

EPOLLPRI    优先数据

EPOLLERR    出错了

EPOLLHUP   挂断

而以上的事件都是数值    uint32_t 进行保存

要进行事件管理,就需要有一个uint32_t  类型的成员保存当前需要监控的事件

事件处理这里,因为有五种事件需要处理,就需要五个回调函数

代码实现: 

class Poller;
class EventLoop;
//Channel用于管理文件描述符的事件监控和处理
class Channel {private:int _fd;//要监控的对象EventLoop *_loop;//EventLoop 事件循环的核心类,负责事件的轮询和分发uint32_t _events;  // 当前需要监控的事件 //uint32_t 每个位可以表示一个特定事件类型uint32_t _revents; // 当前连接触发的事件//事件----读、写、错误、连接断开、任意事件被触发using EventCallback = std::function<void()>;EventCallback _read_callback;   //可读事件被触发的回调函数EventCallback _write_callback;  //可写事件被触发的回调函数EventCallback _error_callback;  //错误事件被触发的回调函数EventCallback _close_callback;  //连接断开事件被触发的回调函数EventCallback _event_callback;  //任意事件被触发的回调函数public:Channel(EventLoop *loop, int fd):_fd(fd), _events(0), _revents(0), _loop(loop) {}int Fd() { return _fd; }uint32_t Events() { return _events; }//获取想要监控的事件void SetREvents(uint32_t events) { _revents = events; }//设置实际就绪的事件void SetReadCallback(const EventCallback &cb) { _read_callback = cb; }void SetWriteCallback(const EventCallback &cb) { _write_callback = cb; }void SetErrorCallback(const EventCallback &cb) { _error_callback = cb; }void SetCloseCallback(const EventCallback &cb) { _close_callback = cb; }void SetEventCallback(const EventCallback &cb) { _event_callback = cb; }//当前是否监控了可读bool ReadAble() { return (_events & EPOLLIN); } //& 都为1是1,否则为0 即如果结果不为0就是监控了读事件//当前是否监控了可写bool WriteAble() { return (_events & EPOLLOUT); }//同理//启动读事件监控void EnableRead() { _events |= EPOLLIN; Update(); }//或 (有一个为1就是1,两个都是0,才是0)//将读事件添加到需要监控的事件集合中//启动写事件监控void EnableWrite() { _events |= EPOLLOUT; Update(); }//关闭读事件监控void DisableRead() { _events &= ~EPOLLIN; Update(); }//&EPOLLIN的~ 就会将原来为1的位置置0(&运算---只有全都为1,才是1,否则为0)//读0000 0001 写0000 0010 _events 里对应 EPOLLIN 的那一位就会被置为 0,也就意味着取消了对可读事件的监控//关闭写事件监控void DisableWrite() { _events &= ~EPOLLOUT; Update(); }//关闭所有事件监控void DisableAll() { _events = 0; Update(); }//移除监控---从epoll的红黑树上直接进行移除void Remove();void Update();//更新事件监控状态//事件处理,一旦连接触发了事件,就调用这个函数,自己触发了什么事件如何处理自己决定void HandleEvent() {if ((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI)) {//只有当_revents有相应的事件(即对应位为1时,&才会为1)//检查是否触发了可读事件、检查对方是否关闭了连接的写端(半关闭)、检查是否有紧急数据可读/*不管任何事件,都调用的回调函数*/if (_read_callback) _read_callback();}/*有可能会释放连接的操作事件,一次只处理一个*/if (_revents & EPOLLOUT) { //检查是否触发了可写事件if (_write_callback) _write_callback();}else if (_revents & EPOLLERR) { //检查是否触发了错误事件if (_error_callback) _error_callback();//一旦出错,就会释放连接,因此要放到前边调用任意回调}else if (_revents & EPOLLHUP) { //检查是否触发了连接断开事件if (_close_callback) _close_callback();}if (_event_callback) _event_callback();//无论前面处理了哪些具体事件,只要 _event_callback 不为空,都会调用该回调函数。//这个回调函数可以用于处理一些通用的事件逻辑,例如记录日志、统计事件次数等。}
};

4、 描述符事件监控Poller类实现:

通过epoll实现对描述符的IO事件监控

封装思想:
       1、必须拥有一个epoll操作句柄

        2、拥有一个struct epoll_event结构数组,监控时保存所有的活跃事件

        3、使用哈希表管理描述符与描述符对应的事件管理Channel对象

逻辑流程:

        1、对描述符进行监控,通过Channel才能知道描述符需要监控什么事件

        2、当描述符就绪了,通过描述符在哈希表中找到对应的Channel(得到了channel才知道什么事件如何处理)

        当描述符就绪了,返回就绪描述符对应的channel

public:(对外的接口)添加或更新描述符所监控的事件、移除描述符的监控、开始监控,获取就绪的channel

Channel类和Poller类的关系:

Channel类:负责封装单个文件描述符的事件管理和处理逻辑。记录文件描述符一般都需要监控的事件(可读、可写、错误等等),并且为不同类型的事件设置对应的回调函数。(当文件描述符上的事件触发,Channel类会调用相应的回调函数进行处理)

对文件描述符的事件管理进行封装,记录fd要监控的事件,并在触发时调用相应的回调函数。(只是管理,不进行监控)

Poller类:作为一个事件轮询器,负责管理多个Channel对象。他通过epoll机制来监听所有注册的文件描述符上的事件,并在有事件发生的时候通知对应的Channel对象。类提供了添加、修改和移除事件监控的接口,以及开始轮询事件的功能。

(管理多个Channel对象,使用epoll机制进行监控)

协作流程

  1. 注册阶段:用户创建channel对象并设置好需要监控的事件和回调函数,然后通过Poller类的UpdateEvent方法将Channel对象注册到Poller中Poller会将Channel对象的文件描述符和对应的事件信息添加到epoll实例中进行监控。
  2. 轮询阶段:Poller类调用epoll_wait函数进入轮询状态,等待文件描述符上的事件发生。当有事件发生时,Poller 会获取到就绪的文件描述符列表,并根据文件描述符找到对应的Channel对象。
  3. 事件处理阶段:Poller会调用 Channel 对象的SetREvent方法设置实际触发的事件,然后将Channel对象添加到活跃列表中。最后,用户可以从活跃列表中取出Channel对象,并调用其Handlevent方法处理事件

代码实现:

#define MAX_EPOLLEVENTS 1024
class Poller {private:int _epfd;struct epoll_event _evs[MAX_EPOLLEVENTS];std::unordered_map<int, Channel *> _channels;private://对epoll的直接操作void Update(Channel *channel, int op) {// int epoll_ctl(int epfd, int op,  int fd,  struct epoll_event *ev);int fd = channel->Fd();struct epoll_event ev;//存储要监控的事件信息ev.data.fd = fd;ev.events = channel->Events();//将 Channel 对象中需要监控的事件设置到 ev.events 中int ret = epoll_ctl(_epfd, op, fd, &ev);//_epfd 是 epoll 实例的文件描述符,fd 是要操作的文件描述符if (ret < 0) {ERR_LOG("EPOLLCTL FAILED!");}return;}//判断一个Channel是否已经添加了事件监控bool HasChannel(Channel *channel) {//判断指定的Channel对象是否以及添加到epoll实例的监控列表中auto it = _channels.find(channel->Fd());if (it == _channels.end()) {return false;}return true;}public:Poller() { //构造函数 创建epoll实例_epfd = epoll_create(MAX_EPOLLEVENTS); //创建epoll实例if (_epfd < 0) {ERR_LOG("EPOLL CREATE FAILED!!");abort();//退出程序}}//添加或修改监控事件void UpdateEvent(Channel *channel) {bool ret = HasChannel(channel);if (ret == false) {//不存在则添加_channels.insert(std::make_pair(channel->Fd(), channel));//它的作用是创建一个 std::pair 对象 能够存储两个不同类型的值,分别称为 first 和 secondreturn Update(channel, EPOLL_CTL_ADD);//添加}return Update(channel, EPOLL_CTL_MOD);//修改}//移除监控void RemoveEvent(Channel *channel) {auto it = _channels.find(channel->Fd());if (it != _channels.end()) {_channels.erase(it);}Update(channel, EPOLL_CTL_DEL);}//开始监控,返回活跃连接void Poll(std::vector<Channel*> *active) {// int epoll_wait(int epfd, struct epoll_event *evs, int maxevents, int timeout)int nfds = epoll_wait(_epfd, _evs, MAX_EPOLLEVENTS, -1);//_epfd 是 epoll 实例的文件描述符,_evs 是存储就绪事件的数组,//MAX_EPOLLEVENTS 是一次最多能处理的事件数量,-1 表示无限等待,直到有事件发生if (nfds < 0) {  //nfds 表示就绪事件的数量if (errno == EINTR) {return ;}ERR_LOG("EPOLL WAIT ERROR:%s\n", strerror(errno));abort();//退出程序}for (int i = 0; i < nfds; i++) {auto it = _channels.find(_evs[i].data.fd);assert(it != _channels.end());it->second->SetREvents(_evs[i].events);//设置实际就绪的事件active->push_back(it->second);}return;}
};void Channel::Remove() { return _loop->RemoveEvent(this); }
void Channel::Update() { return _loop->UpdateEvent(this); }

5、定时任务管理TimerWheel类实现:

定时器模块整合:

timefd:实现内核每隔一段时间,给线程一次超时事件(timerfd可读)

timerwheel:实现每次执行Runtimetask,都可以执行一波到期的定时任务

要实现一个完整的秒级定时器,就需要将这两个功能整合到一起

timerfd设置为每秒钟触发一次定时事件,当事件触发,则运行一次timerwheel的runtimertask,执行一下所有的过期定时任务

而timerfd的事件监控与触发,可以融合EventLoop来实现

TimerTask.hpp  TimerWheel.hpp

class TimerTask  //这个类代表定时器任务
{
private:uint64_t _id; //定时器任务对象IDuint32_t _timeout; //定时器任务的超时时间bool _canceled;  //false表示没有被取消  true表示被取消了TaskFunc _task_cb; //定时器对象要执行的定时任务    ---  任务回调函数ReleaseFunc _release; //定时任务结束时 用于删除 TimerWheel中保存的定时器对象信息public:TimerTask(uint64_t id, uint32_t delay, const TaskFunc &cb) :_id(id),_timeout(delay),//外界自己传入_task_cb(cb),_canceled(false){}~TimerTask()  //执行定时器任务{if(_canceled == false)_task_cb(); //当定时任务触发时,需要执行的具体操作//在析构的时候执行是因为 定时器的任务是销毁不活跃的连接 那么 他的本质任务就是销毁 即可以在类对象析构的时候任务对象被销毁//具体执行什么函数会自己设置 在这个任务构造的时候 需要自己传入的参数第三个_release();// 从TimerWheel 的 _timers 哈希表中删除当前定时器任务的信息 --调用这个函数就是调用TimerWheel类中的RemoveTimer(因为下面的bind函数)}void Cancel(){_canceled = true; //true代表已经被取消}void SetRelease(const ReleaseFunc &cb)  //传入的参数是函数{_release = cb; }uint32_t DelayTime(){return _timeout;}
};class TimerWheel { ///管理这些定时器任务private:using WeakTask = std::weak_ptr<TimerTask>;using PtrTask = std::shared_ptr<TimerTask>;int _tick;      //当前的秒针,走到哪里释放哪里,释放哪里,就相当于执行哪里的任务int _capacity;  //表盘最大数量---其实就是最大延迟时间std::vector<std::vector<PtrTask>> _wheel; //二维数组 里面存放的是定时器任务的指针指针std::unordered_map<uint64_t, WeakTask> _timers;EventLoop *_loop;//定时器超时 读取一次数据 运行过期任务int _timerfd;//定时器描述符--可读事件回调就是读取计数器,执行定时任务std::unique_ptr<Channel> _timer_channel;private:void RemoveTimer(uint64_t id) { //从哈希表中删除任务 通过任务的id找到任务auto it = _timers.find(id);if (it != _timers.end()) {_timers.erase(it);}}static int CreateTimerfd() {int timerfd = timerfd_create(CLOCK_MONOTONIC, 0);//创建一个定时器文件描述符//CLOCK_MONOTONIC 表示单调时钟,它从系统启动时开始计时,不会因为系统时间的调整(如设置系统时间)而发生跳变,适合用于测量时间间隔和定时任务if (timerfd < 0) {ERR_LOG("TIMERFD CREATE FAILED!");abort();}//int timerfd_settime(int fd, int flags, struct itimerspec *new, struct itimerspec *old);struct itimerspec itime;  //定时器每秒触发一次超时itime.it_value.tv_sec = 1;itime.it_value.tv_nsec = 0;//第一次超时时间为1s后itime.it_interval.tv_sec = 1; itime.it_interval.tv_nsec = 0; //第一次超时后,每次超时的间隔时//第一次超时时间为 1 秒后,之后每次超时的间隔也是 1 秒timerfd_settime(timerfd, 0, &itime, NULL);//设置定时器的超时时间,最后返回创建好的定时器文件描述符return timerfd;}int ReadTimefd() {uint64_t times;//有可能因为其他描述符的事件处理花费事件比较长,然后在处理定时器描述符事件的时候,有可能就已经超时了很多次//read读取到的数据times就是从上一次read之后超时的次数int ret = read(_timerfd, &times, 8);//从定时器文件描述符 _timerfd 中读取数据,数据表示从上一次读取之后的超时次数if (ret < 0) {ERR_LOG("READ TIMEFD FAILED!");abort();}return times;}//这个函数应该每秒钟被执行一次,相当于秒针向后走了一步void RunTimerTask() {_tick = (_tick + 1) % _capacity; //_tick指到哪里哪里被清理_wheel[_tick].clear();//清空指定位置的数组,就会把数组中保存的所有管理定时器对象的shared_ptr释放掉}void OnTime() {//根据实际超时的次数,执行对应的超时任务int times = ReadTimefd();for (int i = 0; i < times; i++) {RunTimerTask(); //给超时任务规定执行的次数 即根据超时事件的基本单位 来确定超时次数 再通过超时一次 执行一次定时任务}}//定时任务的添加必须在EventLoop线程中去添加void TimerAddInLoop(uint64_t id, uint32_t delay, const TaskFunc &cb)//添加定时任务 --第三个参数就是定时器任务触发时,具体需要执行的任务{PtrTask pt(new TimerTask(id, delay, cb));pt->SetRelease(std::bind(&TimerWheel::RemoveTimer, this, id));//在添加定时任务的时候,就将id和将RemoveTimer绑定形成一个新的函数,并将这个函数设置为 TimerTask 对象的 _release 回调函数,即在添加定时任务的时候就已经设置好了,该任务在超时的时候应该执行什么任务int pos = (_tick + delay) % _capacity;_wheel[pos].push_back(pt);//数组_timers[id] = WeakTask(pt); //_timers哈希表中,值为id的元素(如果有就跟新,如果没有就新创建)  WeakTask(pt)----以pt这个 std::shared_ptr为参数构建了一个std::weak_ptr<TimerTask> 类型的弱引用}void TimerRefreshInLoop(uint64_t id)//刷新/延迟定时任务{//通过保存的定时器对象的weak_ptr构造一个share_ptr出来,添加到轮子中auto it = _timers.find(id);if(it == _timers.end()){return;//没找到定时任务,无法进行刷新,无法延迟}PtrTask pt = it->second.lock(); //lock获取weak_ptr管理的对象对应的share_ptr//it->second代表  与id对应的 std::weak_ptr<TimerTask> 对象//std::weak_ptr 类的一个成员函数,它的作用是尝试创建一个指向 std::weak_ptr 所观察对象的 std::shared_ptr//从 _timers 哈希表中找到与给定 id 对应的 std::weak_ptr<TimerTask> 对象,//然后调用其 lock() 方法尝试获取一个指向该 TimerTask 对象的 std::shared_ptr。//如果该 TimerTask 对象还存在(即其引用计数不为 0),则 lock() 方法会返回一个有效的 std::shared_ptr,//并将其赋值给 pt;如果该 TimerTask 对象已经被销毁(引用计数为 0),则 lock() 方法会返回一个空的 std::shared_ptr。//为什么这样写????//由于 _timers 中存储的是 std::weak_ptr,我们不能直接通过它来操作对象。//因此,需要调用 lock() 方法获取一个 std::shared_ptr,这样才能确保在操作对象时,对象是存在的。//同时,使用 std::shared_ptr 操作对象可以保证在操作期间对象不会被意外销毁,因为 std::shared_ptr 会增加对象的引用计数。int dalay = pt->DelayTime();//DelayTime() 这个时间外界自己传入int pos = (_tick + dalay) % _capacity;_wheel[pos].push_back(pt); //重新更新位置}void TimerCancelInLoop(uint64_t id){auto it = _timers.find(id);if(it != _timers.end()){return;//没找到定时任务,无法进行刷新,无法延迟}PtrTask pt = it->second.lock(); //lock获取weak_ptr管理的对象对应的share_ptrif(pt)pt->Cancel();}public:TimerWheel(EventLoop *loop):_capacity(60), _tick(0), _wheel(_capacity), _loop(loop), _timerfd(CreateTimerfd()), _timer_channel(new Channel(_loop, _timerfd)) {_timer_channel->SetReadCallback(std::bind(&TimerWheel::OnTime, this));//(Channel 类的对象)_timer_channel 通常代表一个事件通道,用于管理某个文件描述符 这里是是定时器文件描述符 _timerfd 的事件和回调//通过调用 SetReadCallback 方法,将 OnTime 函数设置为当该通道对应的文件描述符有可读事件发生时要执行的回调函数//OnTime 它会读取定时器文件描述符中的超时次数,并根据超时次数执行相应的定时器任务_timer_channel->EnableRead();//启动读事件监控}/*定时器中有个_timers成员,定时器信息的操作有可能在多线程中进行,因此需要考虑线程安全问题*//*如果不想加锁,那就把对定期的所有操作,都放到一个线程中进行*/void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb);//刷新/延迟定时任务void TimerRefresh(uint64_t id);void TimerCancel(uint64_t id);/*这个接口存在线程安全问题--这个接口实际上不能被外界使用者调用,只能在模块内,在对应的EventLoop线程内执行*/bool HasTimer(uint64_t id) {auto it = _timers.find(id);if (it == _timers.end()) {return false;}return true;}
};

eventfd 

eventfd:一种事件通知机制

创建一个描述符用于实现事件通知

eventfd本质在内核里面管理管理的就是一个计数器

创建eventfd就会在内核中创建一个计数器(结构),每当向eventfd中写入一个数值-----用于表示事件通知的次数。可以使用read进行数据的读写,读取到的数据就是通知的次数

假设每次给eventfd中写入一个1,就表示通知了1次,连续写了三次之后,再去read读取出来的数字就是3,读取之后计数清0.

用处:在EventLoop模块中实现线程间的事件通知功能。

#include <sys/eventfd.h>

int  eventfd(unsigned  int  intval, int  flags);

功能:创建一个eventfd对象,实现事件通知

参数: 

initval:计数初值     

flags:EFD_CLOEXEC--禁止进程复制(表示在执行exec系列函数时关闭该文件描述符)

            EFD_NONBLOCK ---启动非阻塞属性

返回值:返回一个文件描述符用于操作

eventfd也是通过read/write/close进行操作的

注意:read 和 write进行IO的时候数据只能是一个8字节的数据

int  mian()
{int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);if(efd < 0){perror("eventfd failed!!");return -1;}uint64_t val = 1;write(efd, &val, sizeof(val));write(efd, &val, sizeof(val));write(efd, &val, sizeof(val));uint64_t res = 0;read(efd, &res, sizeof(res));printf("%ld\n", res);return 0;
}运行结果: 3

6、Reactor-EventLoop线程池类实现:

有多少个线程就有多少了EventLoop 

监控了一个连接,而这个连接一旦就绪,就要进行事件处理。如果在连接处理过程中,这个连接又触发了其他的事件处理,会不会被分配到其他线程中去处理。如果这个描述符,在多个线程中都触发了事件,进行处理,就会存在线程安全问题(如果每一个连接都创建一把锁,显然不现实,消耗很大)。

因此我们需要将一个连接事件监控,以及连接事件处理,以及其他操作都放在同一个线程中进行(一个连接无法绑定一个线程,可是一个EventLoop对应一个线程,我们可以把一个连接绑定到EventLoop中)

如何保证一个连接的所有操作都在eventloop对应的线程中?

解决方案:给eventloop模块中添加一个任务队列。对连接的所有操作都进行一次封装,将对连接的操作并不直接执行,而是当作任务添加到任务队列中。

eventloop处理流程:

1、在线程中对描述符进行事件监控

2、又描述符就绪则对描述符进行事件处理(如何保证处理回调函数的操作都在线程中)

3、所有的就绪事件处理完了,这时候再去将任务队列中的所有任务一一执行

这样能保证对于连接的所有操作,都是在一个线程中执行的,不涉及线程安全问题

但是对于任务队列的操作有线程安全问题,只需要给task的操作加一把锁即可

1、事件监控:

使用Poller模块  有事件就绪则进行事件处理

2、执行任务队列中的任务  

一个线程安全的任务队列

注意:因为有可能因为等待描述符IO事件就绪,导致执行流流程阻塞,这时候任务队列中的任务将得不到执行

因此,要有一个事件通知的东西,能够唤醒事件监控的阻塞

当事件就绪,需要处理的时候,处理过程中,如果对连接进行某些操作:

这些操作必须在eventloop对应的线程中执行,保证对连接的各项操作都是线程安全的。

1、如果执行的操作本就在线程中,就不需要将操作压入队列了,可以直接执行

2、如果执行的操作不在线程中,才需要加入任务池,等待事件处理完了然后执行任务

class EventLoop {private:using Functor = std::function<void()>;std::thread::id _thread_id;//线程ID--用于判断某个操作是否在该 EventLoop 对应的线程中执行 是就在线程里面执行 不是就压入线程池int _event_fd;//eventfd唤醒IO事件监控有可能导致的阻塞std::unique_ptr<Channel> _event_channel;//一个智能指针,指向与 _event_fd 相关联的 Channel 对象,用于管理 eventfd 的事件和回调Poller _poller;//进行所有描述符的事件监控std::vector<Functor> _tasks;//任务池 用于存储待执行的任务队列,每个任务都是一个 Functor 类型的函数对象std::mutex _mutex;//实现任务池操作的线程安全TimerWheel _timer_wheel;//定时器模块public://执行放入任务池中的所有任务//RunAllTask 函数通常在 EventLoop 对应的线程中执行,也就是说任务的执行是在单线程环境下进行的。//在单线程环境中,不存在多个线程同时访问和修改共享资源的问题,因此可以避免线程安全问题void RunAllTask() {std::vector<Functor> functor;{std::unique_lock<std::mutex> _lock(_mutex); _tasks.swap(functor);//交换完之后functor里面就是任务,_tasks里面就是空的了  对共享资源进行加锁}for (auto &f : functor) {  //这样做的好处是可以在解锁后再执行任务,减少锁的持有时间,提高程序的并发性能。f();}return ;}static int CreateEventFd() {//eventfd用于创建一个文件描述符int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); //CreateEventFd()返回值efd就赋值给了_event_fdif (efd < 0) {ERR_LOG("CREATE EVENTFD FAILED!!");abort();//让程序异常退出}return efd;}void ReadEventfd() {//读取efduint64_t res = 0;int ret = read(_event_fd, &res, sizeof(res));//从 _event_fd 中读取数据到 res 中 把数据读取出来进行清0 不读取就一直提示有数据可读if (ret < 0) {//EINTR -- 被信号打断;   EAGAIN -- 表示无数据可读if (errno == EINTR || errno == EAGAIN) {return;}ERR_LOG("READ EVENTFD FAILED!");abort();}return ;}void WeakUpEventFd() {//向 eventfd 写入数据,从而唤醒可能因没有事件就绪而阻塞的 IO 事件监控uint64_t val = 1;int ret = write(_event_fd, &val, sizeof(val));//_event_fd 写入 val 的值 写入数据了 就不会阻塞了if (ret < 0) {if (errno == EINTR) {return;}ERR_LOG("READ EVENTFD FAILED!");abort();}return ;}public:EventLoop():_thread_id(std::this_thread::get_id()), //获取当前线程的 ID 并赋值给 _thread_id_event_fd(CreateEventFd()), //CreateEventFd()返回值efd就赋值给了_event_fd_event_channel(new Channel(this, _event_fd)),_timer_wheel(this) {//给eventfd添加可读事件回调函数,读取eventfd事件通知次数_event_channel->SetReadCallback(std::bind(&EventLoop::ReadEventfd, this));//启动eventfd的读事件监控_event_channel->EnableRead();}//启动EventLoop模块//三步走--事件监控-》就绪事件处理-》执行任务void Start() {  //整个EventLoop流程while(1) {//1. 事件监控, std::vector<Channel *> actives;_poller.Poll(&actives);//Poller类中的Poll函数 开始监控并返回活跃连接//2. 事件处理。 for (auto &channel : actives) { //actives 活跃连接channel->HandleEvent();//进行事件处理 不同事件进行不同处理}//3. 执行任务RunAllTask();}}//用于判断当前线程是否是EventLoop对应的线程;bool IsInLoop() {return (_thread_id == std::this_thread::get_id());//_thread_id EventLoop创建时的id  std::this_thread::get_id()获取当前线程的id}void AssertInLoop() {assert(_thread_id == std::this_thread::get_id());}//判断将要执行的任务是否处于当前线程中,如果是则执行,不是则压入队列。void RunInLoop(const Functor &cb) { //给我提供一个任务 他在线程中就执行他 他不在就将他压入线程池中if (IsInLoop()) {return cb(); //在就执行任务函数}return QueueInLoop(cb); //不在 压入任务池}//将操作压入任务池void QueueInLoop(const Functor &cb) {{std::unique_lock<std::mutex> _lock(_mutex); //加锁 创建_lock对象的时候 会对_mutex加锁 保证在_lock生命收起内 _mutex保护的区域不会有别的线程访问_tasks.push_back(cb); //我们将任务压入到任务池中了,可是线程阻塞在事件监控,现在没有描述符就绪事件,那么事件监控就一直在等,等有事件了,才会处理任务,就会导致事件久久得不到执行}//因此需要 唤醒有可能因为没有事件就绪,而导致的epoll阻塞;-----  因为我们是先进行事件监控再进行任务执行//唤醒事件就绪-----其实就是给eventfd写入一个数据,eventfd就会触发可读事件(有事件就绪了,就不会再阻塞了)WeakUpEventFd();}//事件监控//添加/修改描述符的事件监控void UpdateEvent(Channel *channel) { return _poller.UpdateEvent(channel); }//移除描述符的监控void RemoveEvent(Channel *channel) { return _poller.RemoveEvent(channel); }void TimerAdd(uint64_t id, uint32_t delay, const TaskFunc &cb) { return _timer_wheel.TimerAdd(id, delay, cb); }void TimerRefresh(uint64_t id) { return _timer_wheel.TimerRefresh(id); }void TimerCancel(uint64_t id) { return _timer_wheel.TimerCancel(id); }bool HasTimer(uint64_t id) { return _timer_wheel.HasTimer(id); }
};void Channel::Remove() { return _loop->RemoveEvent(this); }
void Channel::Update() { return _loop->UpdateEvent(this); }

对以上代码模块的整体理解  梳理上面模块的关系:

#include "../source/server.hpp"
void HandleClose(Channel *channel)
{DBG_LOG("close fd:%d",channel->Fd());channel->Remove();//移除监控delete channel;
}
void HandleRead(Channel *channel)
{int fd = channel->Fd();char buf[1024] = {0};int ret = recv(fd, buf, 1023, 0);//读取数据if(ret <= 0){return HandleClose(channel);//关闭释放}DBG_LOG("%s", buf);channel->EnableWrite();//启动可写事件监控,以便后续可以向该套接字发送数据
}
void HandleWrite(Channel *channel)
{int fd = channel->Fd();const char *data = "天气还不错!";int ret = send(fd, data, strlen(data), 0);if(ret < 0){return HandleClose(channel);关闭释放}channel->DisableWrite();//关闭写监控 因为数据已经发送完,不需要再监控写事件(可写)就是向套接字的发送缓冲区写数据
}
void HandleEvent(EventLoop *loop, Channel *channel,uint64_t timerid)
{loop->TimerRefresh(timerid);
}
void Acceptor(EventLoop *loop, Channel *lst_channel)
{int fd = lst_channel->Fd();int newfd = accept(fd,NULL, NULL);if(newfd < 0) {return;}uint64_t timerid = rand() % 10000; //生成一个随机的定时器 ID 范围在 0 到 9999 之间Channel *channel = new Channel(loop, newfd);//创建一个新的 Channel 类对象,关联 EventLoop 对象 loop 和新连接的文件描述符 newfdchannel->SetReadCallback(std::bind(HandleRead,channel));//为通信套接字设置可读事件回调函数为 HandleWrite 函数channel->SetWriteCallback(std::bind(HandleWrite,channel));//可写事件的回调函数channel->SetCloseCallback(std::bind(HandleClose,channel));//关闭事件的回调函数//channel->SetErrorCallback(std::bind(HandleError,channel));//错误事件的回调函数channel->SetEventCallback(std::bind(HandleEvent,loop,channel,timerid));//任意事件的回调函数//非活跃连接的超时释放操作,10s后关闭连接//注意:定时销毁任务,必须在启动读事件之前,因为可能启动了事件监控之后,立即有了事件,但是这时候还没有任务loop->TimerAdd(timerid, 10, std::bind(HandleClose,channel));//添加一个定时任务 timerid 为定时器 ID,10 表示超时时间为 10 秒 回调函数为 HandleClose 函数,用于在 10 秒后关闭该通道channel->EnableRead();//启动新通道的可读事件监控,以便接收新连接上的数据
}
int main()
{srand(time(NULL));EventLoop loop;Socket lst_sock;lst_sock.CreateServer(8500);//创建一个监听再8500端口的服务器套接字Channel channel(&loop, lst_sock.Fd());channel.SetReadCallback(std::bind(Acceptor,&loop,&channel));//设置监听通道的可读事件回调函数为 Acceptor 函数channel.EnableRead();//启动监听通道的可读事件监控,以便接受新的连接请求while(1){loop.Start();//启动事件循环,处理各种事件(如连接请求、数据读写等)}lst_sock.Close();//调用 Socket 类的 Close 方法,关闭监听套接字return 0;
}

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com