您的位置:首页 > 财经 > 产业 > 【Linux】第十八章 Reactor模式

【Linux】第十八章 Reactor模式

2024/11/17 15:45:12 来源:https://blog.csdn.net/YQ20210216/article/details/141562090  浏览:    关键词:【Linux】第十八章 Reactor模式

文章目录

  • Reactor模式
  • epoll ET服务器(Reactor模式)
    • 设计思路
    • Epoller.hpp
    • Sock.hpp
    • Protocol.hpp
    • Service.hpp
    • TcpServer.hpp-重点
      • Connection类
      • TcpServer类
        • 服务器框架
        • TcpServer构造
        • AddConnection函数
        • SetNonBlock 函数
        • Accepter函数
        • IsExists函数
        • TcpRecver函数
        • TcpSender函数
        • EnableReadWrite函数
        • TcpExcepter函数
        • Dispatcher函数
        • TcpServer析构
    • main.cc
      • HandlerRequest函数
      • BeginHandler函数
    • 测试


Reactor模式

  • 单 Reactor 单线程,前台接待员和服务员是同一个人,全程为顾客服务
  • 单 Reactor 多线程,1 个前台接待员,多个服务员,接待员只负责接待
  • 主从 Reactor 多线程,多个前台接待员,多个服务员

特点

  • 响应快,不必为单个同步事件所阻塞
  • 避免了多线程 或 进程的切换开销

主要使用单 Reactor 单线程,相当于请求到来时,判断请求是各种事件,然后将请求和事件和回调方法结合存放到红黑树当中,当时间就绪的时候回调对应事件的处理方法

epoll ET服务器(Reactor模式)

设计思路

Epoller.hpp

对 epoll 的三个系统调用函数进行一定的封装

#pragma once
#include <iostream>
#include <cerrno>
#include <cstdlib>
#include <unistd.h>
#include <sys/epoll.h>class Epoller
{
public:static const int gsize = 128;
public:static int CreateEpoller(){int epfd = epoll_create(gsize);// 创建对应size的epfdif (epfd < 0){cout<<"epoll_create :" <<errno<< ':'<< strerror(errno)<<endl;exit(3);}return epfd;}static bool AddEvent(int epfd, int sock, uint32_t event){struct epoll_event ev;ev.events = event;ev.data.fd = sock;int n = epoll_ctl(epfd, EPOLL_CTL_ADD, sock, &ev);// 给对应的socket添加到epoll中return n == 0;}static bool ModEvent(int epfd, int sock, uint32_t event){struct epoll_event ev;ev.events = event;ev.data.fd = sock;int n = epoll_ctl(epfd, EPOLL_CTL_MOD, sock, &ev);// 修改已有scoket的eventreturn n == 0;}static bool DelEvent(int epfd, int sock){int n = epoll_ctl(epfd, EPOLL_CTL_DEL, sock, nullptr);// 删除指定socketreturn n == 0;}static int LoopOnce(int epfd, struct epoll_event revs[], int num){// 单次wait的调用,从数组里面取回就绪的文件描述符int n = epoll_wait(epfd, revs, num, -1);if(n == -1){cout<<"epoll_wait : %d : %s" <<errno<< ':'<< strerror(errno)<<endl;}return n;}
};

Sock.hpp

有关套接字初始化,绑定,监听,接收

#pragma once#include <iostream>
#include <fstream>
#include <string>
#include <vector>
#include <cstdio>
#include <cstring>
#include <signal.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <sys/types.h>
#include <sys/wait.h>
#include <pthread.h>
#include <cerrno>
#include <cassert>class Sock
{
public:static int SocketInit(){int listenSock = socket(PF_INET, SOCK_STREAM, 0);if (listenSock < 0){exit(1);}int opt = 1;setsockopt(listenSock, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));return listenSock;}static void Bind(int socket, uint16_t port){struct sockaddr_in local; // 用户栈memset(&local, 0, sizeof local);local.sin_family = PF_INET;local.sin_port = htons(port);local.sin_addr.s_addr = INADDR_ANY;// 2.2 本地socket信息,写入sock_对应的内核区域if (bind(socket, (const struct sockaddr *)&local, sizeof local) < 0){exit(2);}}static void Listen(int socket,int gbacklog){if (listen(socket, gbacklog) < 0){exit(3);}}static int Accept(int socket, std::string *clientip, uint16_t *clientport){struct sockaddr_in peer;socklen_t len = sizeof(peer);int serviceSock = accept(socket, (struct sockaddr *)&peer, &len);if (serviceSock < 0){// 获取链接失败return -1;}if(clientport) *clientport = ntohs(peer.sin_port);if(clientip) *clientip = inet_ntoa(peer.sin_addr);return serviceSock;}
};

Protocol.hpp

有关序列化反序列化协议

#pragma once
#include <iostream>
#include <vector>
#include <cstring>
#include <string>
#include <cstdio>#define SEP 'X'
#define SEP_LEN sizeof(SEP)#define CRLF "\r\n"
#define CRLF_LEN strlen(CRLF) 
#define SPACE " "
#define SPACE_LEN strlen(SPACE)// 分离独立报文
void PackageSplit(std::string &inbuffer, std::vector<std::string> *result)
{while (true){std::size_t pos = inbuffer.find(SEP);if (pos == std::string::npos)break;result->push_back(inbuffer.substr(0, pos));inbuffer.erase(0, pos + SEP_LEN);}
}struct Request
{int x;int y;char op;
};struct Response
{int code;int result;
};bool Parser(std::string &in, Request *req)
{// 1 + 1, 2 * 4, 5 * 9, 6 *1std::size_t spaceOne = in.find(SPACE);if (std::string::npos == spaceOne)return false;std::size_t spaceTwo = in.rfind(SPACE);if (std::string::npos == spaceTwo)return false;std::string dataOne = in.substr(0, spaceOne);std::string dataTwo = in.substr(spaceTwo + SPACE_LEN);std::string oper = in.substr(spaceOne + SPACE_LEN, spaceTwo - (spaceOne + SPACE_LEN));if (oper.size() != 1)return false;// 转成内部成员req->x = atoi(dataOne.c_str());req->y = atoi(dataTwo.c_str());req->op = oper[0];return true;
}void Serialize(const Response &resp, std::string *out)
{std::string ec = std::to_string(resp.code);std::string res = std::to_string(resp.result);*out = ec;*out += SPACE;*out += res;*out += CRLF;
}

Service.hpp

业务

#pragma once
#include "Protocol.hpp"
#include <functional>using service_t = std::function<Response (const Request &req)>;static Response calculator(const Request &req)
{Response resp = {0, 0};switch (req.op){case '+':resp.result = req.x + req.y;break;case '-':resp.result = req.x - req.y;break;case '*':resp.result = req.x * req.y;break;case '/':{ // x_ / y_if (req.y == 0)resp.code = -1; // -1. 除0elseresp.result = req.x / req.y;}break;case '%':{ // x_ / y_if (req.y == 0)resp.code = -2; // -2. 模0elseresp.result = req.x % req.y;}break;default:resp.code = -3; // -3: 非法操作符break;}return resp;
}

TcpServer.hpp-重点

Connection类

将客户端发送的数据拼接,每一个 sock 都要被封装为一个 Connection 对象

#pragma once
#include <iostream>
#include <string>
#include <vector>
#include <cerrno>
#include <unordered_map>
#include <functional>
#include "Sock.hpp"
#include "Epoller.hpp"
#include "Protocol.hpp"class Connection;
class TcpServer;using func_t = std::function<int(Connection *)>;
typedef int (*callback_t)(Connection *, string &);
000000
//class Connection
class Connection
{
public:int sock_;              // I/O 文件描述符TcpServer *R_;          // 主服务器的类指针std::string inbuffer_;  // 接收缓冲区std::string outbuffer_; // 发送缓冲区func_t recver_;         // 读事件回调函数func_t sender_;         // 写事件回调函数func_t excepter_;       // 异常事件回调函数public:Connection(int sock, TcpServer *r) : sock_(sock), R_(r){}void SetRecver(func_t recver) { recver_ = recver; }void SetSender(func_t sender) { sender_ = sender; }void SetExcepter(func_t excepter) { excepter_ = excepter; }~Connection() {}
};
//class TcpServer

TcpServer类

服务器框架
//class TcpServer
static void SetNonBlock(int fd){}
class TcpServer
{
public:TcpServer(callback_t cb, int port = 8080) : cb_(cb){}void AddConnection(int sockfd, uint32_t event, func_t recver, func_t sender, func_t excepter){}int Accepter(Connection *conn){}int TcpRecver(Connection *conn){}int TcpSender(Connection *conn){}int TcpExcepter(Connection *conn){}bool IsExists(int sock){}// 打开或者关闭对于特定socket是否要关心读或者写// EnableReadWrite(sock, true, false);// EnableReadWrite(sock, true, true);void EnableReadWrite(int sock, bool readable, bool writeable){}// 根据就绪事件,将事件进行事件派发void Dispatcher(){}void Run(){}~TcpServer(){if (listensock_ != -1)close(listensock_);if (epfd_ != -1)close(epfd_);delete[] revs_;}private:// 接收队列的长度static const int revs_num = 64;// 1. 网络socketint listensock_;// 2. epollint epfd_;// 3. 用哈希表保存连接unordered_map<int, Connection *> connections_;// 4. 就绪事件的件描述符的数组struct epoll_event *revs_;// 5. 设置完整报文的处理方法callback_t cb_;
};
TcpServer构造

创建 listensock 和创建 Epoll 对象,还要为每个socket封装为 Connection 对象

TcpServer(callback_t cb, int port = 8080) : cb_(cb){// 为保存就绪事件的数组申请空间revs_ = new struct epoll_event[revs_num];// 获取 listensocklistensock_ = Sock::SocketInit();SetNonBlock(listensock_);Sock::Bind(listensock_, port);Sock::Listen(listensock_, 20);// 多路转接epfd_ = Epoller::CreateEpoller();// 添加 listensock 到服务器中AddConnection(listensock_, EPOLLIN | EPOLLET,std::bind(&TcpServer::Accepter, this, std::placeholders::_1), nullptr, nullptr);}
AddConnection函数

当socket准备就绪,则通过哈希表就能找到Connection 对象,可以调用回调方法,设置EPOLLET 的 ET 模式

    void AddConnection(int sockfd, uint32_t event, func_t recver, func_t sender, func_t excepter){//设置 sock 为非阻塞if (event & EPOLLET)SetNonBlock(sockfd);// 添加sockfd到epollEpoller::AddEvent(epfd_, sockfd, event);// 将sockfd匹配的Connection也添加到当前的unordered_map中Connection *conn = new Connection(sockfd, this);conn->SetRecver(recver);conn->SetSender(sender);conn->SetExcepter(excepter);//将 Connection 对象的地址插入到哈希表connections_.insert(std::make_pair(sockfd, conn));cout << "添加新链接到connections成功: " << sockfd << endl;}
SetNonBlock 函数

设置 sock 为非阻塞

static void SetNonBlock(int fd)
{int fl = fcntl(fd, F_GETFL);fcntl(fd, F_SETFL, fl | O_NONBLOCK);
}
Accepter函数

不止处理一个连接所以要循环处理,建立连接, socket变成了一个普通的 I/O ,epoll 会监视IO

    int Accepter(Connection *conn){while (true){std::string clientip;uint16_t clientport = 0;int sockfd = Sock::Accept(conn->sock_, &clientip, &clientport);if (sockfd < 0){// 接收函数被事件打断了if (errno == EINTR)continue;// 取完了else if (errno == EAGAIN || errno == EWOULDBLOCK)break;else{cout << "accept error" << endl;return -1;}}cout << "get a new link: " << sockfd << endl;//将 sock 交给 TcpServer 监视,并注册回调函数AddConnection(sockfd, EPOLLIN | EPOLLET,std::bind(&TcpServer::TcpRecver, this, std::placeholders::_1),std::bind(&TcpServer::TcpSender, this, std::placeholders::_1),std::bind(&TcpServer::TcpExcepter, this, std::placeholders::_1));}return 0;}
IsExists函数

在处理某一个链接的时候,我们必须要保证这个链接在已有的 map 里面,否则代表这个链接已经被关闭或者异常退出了;同理,在异常和关闭链接的处理流程中,我们也需要将链接从 map 中删除

 bool IsExists(int sock){auto iter = connections_.find(sock);if (iter == connections_.end())return false;elsereturn true;}
TcpRecver函数

对于读事件而言我们也是进行循环读取,该文件描述符也需要被设置为非阻塞。读取的内容拼接到该 Connection 对象的输入缓冲区 string 中;

在读取完毕后,我们需要在协议里面定义一个根据应用层协议字段来分离报文的函数(避免 tcp 的粘包问题),最终会得到一个 string 的数组,每个数组成员都是一个完整的报文;

最后,我们直接一个 for 循环,通过该 tcpserver 对象在初始化时候设置的 cb_函数回调指针,来处理每一个报文

    int TcpRecver(Connection *conn){while (true){char buffer[1024];ssize_t s = recv(conn->sock_, buffer, sizeof(buffer) - 1, 0);if (s > 0){buffer[s] = 0;conn->inbuffer_ += buffer;}else if (s == 0){cout << "client quit" << endl;conn->excepter_(conn);break;}else{// 接收事件被打断if (errno == EINTR)continue;// 接收缓冲区空了else if (errno == EAGAIN || errno == EWOULDBLOCK)break;else{// 出错了cout << "recv error: " << errno << endl;conn->excepter_(conn);break;}}}// 将本轮全部读取完毕std::vector<std::string> result;PackageSplit(conn->inbuffer_, &result);for (auto &message : result){cb_(conn, message);}return 0;}
TcpSender函数

将上层业务处理好后的数据发送给客户端

    int TcpSender(Connection *conn){while (true){ssize_t n = send(conn->sock_, conn->outbuffer_.c_str(), conn->outbuffer_.size(), 0);if (n > 0){// 去除已经成功发送的数据conn->outbuffer_.erase(0, n);}else{// 写入操作被打断if (errno == EINTR)continue;// 写入缓冲区满了,没办法继续写else if (errno == EAGAIN || errno == EWOULDBLOCK)break;else{conn->excepter_(conn);cout << "send error: " << errno << endl;break;}}}return 0;}
EnableReadWrite函数
// 打开或者关闭对于特定socket是否要关心读或者写// EnableReadWrite(sock, true, false);// EnableReadWrite(sock, true, true);void EnableReadWrite(int sock, bool readable, bool writeable){uint32_t event = 0;event |= (readable ? EPOLLIN : 0);event |= (writeable ? EPOLLOUT : 0);Epoller::ModEvent(epfd_, sock, event);}
TcpExcepter函数

在这个函数体内,会将链接从 epoll 中删除、关闭链接、释放 connection 对象、将文件描述符从 map 里面剔除;

需要注意的是,一定要先将 socket 从 epoll 里面剔除掉,再关闭 socket

    int TcpExcepter(Connection *conn){// 0.判断有效性if (!IsExists(conn->sock_))return -1;5 /// 1.删除epoll的监看Epoller::DelEvent(epfd_, conn->sock_);cout << "remove epoll event!" << endl;// 2.closeclose(conn->sock_);cout << "close fd: " << conn->sock_ << endl;// 3. delete conndelete connections_[conn->sock_];cout << "delete connection object done" << endl;// 4.erase connconnections_.erase(conn->sock_);cout << "erase connection from connections" << endl;return 0;}
Dispatcher函数

一次的运行就是调用一次 epoll_wait,再根据事件就绪的文件描述符,调用不同的事件处理函数

    // 根据就绪事件,将事件进行事件派发void Dispatcher(){int n = Epoller::LoopOnce(epfd_, revs_, revs_num);for (int i = 0; i < n; i++){int sock = revs_[i].data.fd;uint32_t revent = revs_[i].events;// 判断是否出现错误,如果出现了错误,那就把EPOLLIN和OUT都加上// 这样这个链接会进入下面的处理函数,并在处理函数中出现异常// 处理函数中出现异常回统一调用TcpExcpter函数if (revent & EPOLLHUP)revent |= (EPOLLIN | EPOLLOUT);if (revent & EPOLLERR)revent |= (EPOLLIN | EPOLLOUT);if (revent & EPOLLIN){if (IsExists(sock) && connections_[sock]->recver_)connections_[sock]->recver_(connections_[sock]);}// 当链接的写事件被激活的时候,在这里就会触发写事件的处理// 所以并不需要在recv里面主动调用写事件处理函数// 只需要告诉epoll让它帮我们监控写事件,那么就会在这里触发写操作if (revent & EPOLLOUT){if (IsExists(sock) && connections_[sock]->sender_)connections_[sock]->sender_(connections_[sock]);}}}void Run(){while (true){Dispatcher();}}
TcpServer析构

将 listensocket 和 epfd 两个文件描述符关闭,并析构掉链接数组

    ~TcpServer(){if (listensock_ != -1)close(listensock_);if (epfd_ != -1)close(epfd_);delete[] revs_;}

main.cc

要做的就是获取到命令行参数的端口,然后创建 tcpserver 对象并绑定事件处理函数

#include <memory>
#include "TcpServer.hpp"
#include "Service.hpp"using namespace std;
static void usage(std::string process)
{cerr << "\nUsage: " << process << " port\n"<< endl;
}
int BeginHandler(Connection *conn, std::string &message, service_t service)
{
}// 1 + 1X2 + 3X5 + 6X
int HandlerRequest(Connection *conn, std::string &message)
{
}
//./test 8080
int main(int argc, char *argv[])
{if (argc != 2){usage(argv[0]);exit(0);}TcpServer svr(HandlerRequest, atoi(argv[1]));svr.Run();return 0;
}

HandlerRequest函数

封装

// 1 + 1X2 + 3X5 + 6X8 -> 1 + 1
int HandlerRequest(Connection *conn, std::string &message)
{// beginhandler里面是具体的调用逻辑,calculator是本次事务处理函数return BeginHandler(conn, message, calculator);
}

BeginHandler函数

这里传入来的 message 肯定是一个完整的应用层报文

int BeginHandler(Connection *conn, std::string &message, service_t service)
{// message一定是一个完整的报文,因为我们已经对它进行了解码Request req;// 反序列化,进行处理的问题if (!Parser(message, &req)){// 写回错误消息return -1;}// 业务逻辑Response resp = service(req);std::cout << req.x << " " << req.op << " " << req.y << std::endl;std::cout << resp.code << " " << resp.result << std::endl;// 序列化std::string sendstr;Serialize(resp, &sendstr);// 处理完毕的结果,发送回给clientconn->outbuffer_ += sendstr;conn->sender_(conn);if(conn->outbuffer_.empty()) conn->R_->EnableReadWrite(conn->sock_, true, false);else conn->R_->EnableReadWrite(conn->sock_, true, true);std::cout << "--- end ---" << std::endl;return 0;
}

测试

设备1启动服务器后,设备2连接,并发送业务

//设备1
[aaa@VM-8-14-centos ~]$ ./test
level[DEBUG], time[1724643960] add listensock[3] to epoll success...
level[DEBUG], time[1724643975] accept client[127.0.0.1:35818] success, add to epoll of TcpServer success, sock: 5
level[DEBUG], time[1724643980] for sock[5] called Excepter() OK...
//设备2
[aaa@VM-8-14-centos ~]$ telnet 127.0.0.1 8080
Trying 127.0.0.1...
Connected to 127.0.0.1.
Escape character is '^]'.
1 + 1X 2 * 2X
code:0 result:2Xcode:3 result:0XConnection closed by foreign host.

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com