您的位置:首页 > 汽车 > 新车 > 北京优酷首页培训机构_上海公司章程在哪里下载_怎么引流怎么推广自己的产品_宁波seo网络推广推荐

北京优酷首页培训机构_上海公司章程在哪里下载_怎么引流怎么推广自己的产品_宁波seo网络推广推荐

2024/12/26 22:53:29 来源:https://blog.csdn.net/2404_87273268/article/details/144377286  浏览:    关键词:北京优酷首页培训机构_上海公司章程在哪里下载_怎么引流怎么推广自己的产品_宁波seo网络推广推荐
北京优酷首页培训机构_上海公司章程在哪里下载_怎么引流怎么推广自己的产品_宁波seo网络推广推荐

文章目录

  • 前言
  • Channel
    • Channel代码
      • Channel.h
      • Channel.cc
  • Poller
    • Poller代码
      • Poller.h
      • Poller.cc
  • EpollPoller
    • EpollPoller代码
      • EpollPoller.h
      • EpollPoller.cc
  • EventLoop
    • EventLoop代码
      • EventLoop.h
      • EventLoop.cc
    • 类图

前言

重写Muduo库实现核心模块的Git仓库

注:本文将重点剖析 Muduo 网络库的核心框架,深入探讨作者精妙的代码设计思路,并针对核心代码部分进行重写,将原本依赖 boost 的实现替换为原生的 C++11 语法。需要说明的是,本文并不打算对整个 Muduo 库进行完整的重写。Muduo库源码链接

在前文中,我们对Muduo的基础模块LoggerTimestampBuffer有了初步了解(如果还未阅读过的同学可先参考这篇文章进行回顾)。接下来,我们将深入探讨Muduo的事件循环相关模块,包括:

  • Channel
  • Poller
  • EpollPoller
  • EventLoop

通过对这些模块的剖析,我们将逐步了解事件循环的运行机制和高效的事件分发策略,从而进一步理解 Muduo 在构建高性能服务器时的设计思路与实现细节。接下来,我们将以事件循环的主执行单元 EventLoop 为线索,分别阐述 ChannelPollerEpollPollerEventLoop 的职责、设计理念以及交互关系。

Channel

ChannelMuduo 事件循环框架中连接文件描述符事件处理逻辑的中间抽象层。其主要作用是为某个特定的 I/O 通道(通常是一个套接字文件描述符)绑定对应的事件回调函数,并协助 EventLoop 对发生的事件进行分发处理。

简而言之,Channel 并不关心 I/O 操作的具体过程,它更像是一个“桥梁”或“适配器”,将底层的文件描述符事件与上层的业务回调关联起来。它的核心功能和责任包括:

  1. 文件描述符与事件的关联
    Channel 持有一个文件描述符(如套接字)和一组感兴趣的事件类型(如可读事件、可写事件)。当底层 I/O 多路复用机制(如 epoll)检测到该文件描述符有事件发生时,会通知对应的 Channel
  2. 事件回调的注册与触发
    Channel 不负责事件的检测,但负责事件的回调分发。当 Poller (如 EpollPoller) 返回已发生的事件时,EventLoop 会调用 Channel 的回调函数(如读回调、写回调、关闭回调、错误回调)。借助 Channel,框架实现了对不同 I/O 通道的统一管理。
  3. EventLoopPoller协作:
    EventLoop 中维护一组与当前线程相关的 Channel。每个 Channel 会在内部存储自己的感兴趣事件,并在必要时由 EventLoop 通过 Poller 注册、修改、删除对相应文件描述符的监听。当事件发生时,Poller 收集事件并通知 EventLoopEventLoop 随后调用 Channel 的相应回调函数。
  4. 简化上层逻辑
    有了 Channel 这个抽象,上层应用代码只需负责定义事件发生时的处理逻辑(即回调函数),而不必直接与底层的多路复用接口交互。这大大简化了事件循环框架的使用难度和代码复杂度。

总之,ChannelMuduo 中连接底层事件检测机制上层事件处理逻辑的关键组件,通过统一的接口与抽象,它极大地降低了事件处理的复杂性,使用户可以更直观地编写网络事件处理代码。

Channel代码

Channel.h

class Channel : noncopyable
{
public:using EventCallback = std::function<void()>;using ReadEventCallback = std::function<void(Timestamp)>;Channel(EventLoop* eventloop, int fd);~Channel();void handleEvent(Timestamp receiveTime);void setReadCallback(ReadEventCallback cb) { readCallback_ = std::move(cb); }void setWriteCallback(EventCallback cb)   { writeCallback_ = std::move(cb); }void setCloseCallback(EventCallback cb)   { closeCallback_ = std::move(cb); }void setErrorCallback(EventCallback cb)   { errorCallback_ = std::move(cb); }// 将channel绑定到shared_ptr管理的owner对象,// 防止在handleEvent中销毁owner对象void tie(const std::shared_ptr<void>&);int fd() const { return fd_; }int events() const { return events_; }// 被poller调用设置已就绪的事件void set_revents(int revt) { revents_ = revt; }bool isNonEvent() const { return events_ == kNoneEvent; }void enableReading() { events_ |= kReadEvent; update();}void disableReading(){ events_ &= ~kReadEvent; update();}void enableWriting() { events_ |= kWriteEvent; update();}void disableWriting(){ events_ &= ~kWriteEvent; update();}void disableAll()    { events_ = kNoneEvent; update();}bool isReading()const{ return events_ & kReadEvent; }bool isWriting()const{ return events_ & kWriteEvent; }int index() const { return index_; }void set_index(int index) { index_ = index; }EventLoop* ownerLoop() const { return loop_; }void remove();
private:void update();void handleEventWithGuard(Timestamp receiveTime);private:static const int kNoneEvent; // 无事件static const int kReadEvent; // 读事件static const int kWriteEvent;// 写事件EventLoop* loop_;   // 该Channel所绑定的EventLoopconst int fd_;      // 封装的fdint events_;        // 注册事件int revents_;       // 就绪事件/*index_描述当前Channel的状态:-1 : 新添加,还未注册到epoll1 : 已注册并添加到epoll2 : 已删除 */int index_;// 通过weak_ptr将Channel绑定到TcpConnectionstd::weak_ptr<void> tie_;	// 是否绑定bool tied_;ReadEventCallback readCallback_;// 读回调EventCallback writeCallback_;	// 写回调EventCallback closeCallback_;	// 关闭回调EventCallback errorCallback_;	// 错误回调
};

解析:

几个重要的对外接口:

  • 几个设置回调函数的公有函数
  • tie:将Channel绑定到给定的参数
  • set_revents:用来设置Channel的就绪事件
  • handleEvent:根据就绪事件revents,执行回调函数

Channel.cc

const int Channel::kNoneEvent = 0;
// EPOLLPRI: 当文件描述符上有紧急数据时,触发
const int Channel::kReadEvent = EPOLLIN | EPOLLPRI;
const int Channel::kWriteEvent = EPOLLOUT;Channel::Channel(EventLoop *eventloop, int fd) :loop_(eventloop),fd_(fd), events_(0),revents_(0),index_(-1),tied_(false)
{}Channel::~Channel() {}void Channel::handleEvent(Timestamp receiveTime)
{/*处理回调函数的向外提供的接口在EventLoop中调用*/if(tied_){std::shared_ptr<void> guard = tie_.lock();  // 提升if(guard){handleEventWithGuard(receiveTime);}}else{handleEventWithGuard(receiveTime);}
}// 在TcpConnection::connectEstablish中调用channel::tie
void Channel::tie(const std::shared_ptr<void>& obj)
{tie_ = obj;tied_ = true;
}void Channel::remove()
{loop_->removeChannel(this);
}// 当改变channel所表示的fd的events后,需要在poller里面更改fd相应的事件epoll_ctl
void Channel::update()
{loop_->updateChannel(this);
}void Channel::handleEventWithGuard(Timestamp receiveTime)
{/*调用回调函数的实施接口*/if((revents_ & EPOLLHUP) && !(revents_ & EPOLLIN)){/*EPOLLHUP: 表示文件描述符挂起事件,通常表示远端关闭连接*/if(closeCallback_){closeCallback_();}}if(revents_ & EPOLLERR){if(errorCallback_){errorCallback_();}}if(revents_ & (EPOLLIN | EPOLLPRI)){if(readCallback_){readCallback_(receiveTime);}}if(revents_ & EPOLLOUT){if(writeCallback_){writeCallback_();}}
}

Poller

PollerMuduo 中对底层事件分发器(Demultiplexer,如 epollpoll 等机制)的抽象封装。它负责对已注册的文件描述符进行监听,并在事件准备就绪(如套接字可读或可写)时返回相应事件。通过这种抽象,EventLoop 无需直接与底层的 I/O 多路复用机制交互,而可以统一调用 Poller 接口进行事件的获取和分发,从而提升代码的可扩展性与可维护性。

Poller代码

Poller.h

class Poller : noncopyable
{
public:using ChannelList = std::vector<Channel*>;Poller(EventLoop*);virtual ~Poller() = default;// 给所有IO复用提供统一的接口virtual Timestamp poll(int timeout, ChannelList* activeChannels) = 0;virtual void updateChannel(Channel*) = 0;virtual void removeChannel(Channel*) = 0;bool hasChannel(Channel* channel) const;// Eventloop可以通过该接口获取默认的IO复用的对象static Poller* newDefaultPoller(EventLoop* loop);
protected:using ChannelMap = std::unordered_map<int, Channel*>; ChannelMap channels_;
private:EventLoop* ownerLoop_;
};

解析:

由于 Poller 是对事件分发器(Demultiplexer)的抽象,因此它作为抽象基类被其他具体的事件分发实现类(如 PollPollerEpollPoller)继承和扩展。为此,Poller 必须提供统一的接口规范,以便各具体实现类能够按照这一统一接口进行定制和实现。具体接口如下:

  • poll(int timeout, ChannelList* activeChannels):事件分发器的核心函数。对于不同的事件分发器具体类,该函数会调用各自底层的IO多路复用系统调用(如poll()epoll_wait())。当有事件就绪时,它会将相应的事件填充到activeChannels
  • updateChannel(Channel*) 用于在底层事件分发器中更新指定 Channel 对文件描述符所关注的事件类型(如可读、可写等)。
  • emoveChannel(Channel*) 则用于将指定的 Channel 从事件监听中移除。
  • newDefaultPoller(EventLoop* loop):此方法为静态方法,用于其派生类构造一个新的Poller

Poller.cc

Poller::Poller(EventLoop *loop) : ownerLoop_(loop){}bool Poller::hasChannel(Channel *channel) const 
{auto it = channels_.find(channel->fd());return it != channels_.end() && it->second == channel;
}

这里有个不同点在于,作者将newDefaultPoller方法定义到其他文件上(DefaultPoller.cc):

Poller* Poller::newDefaultPoller(EventLoop* loop)
{if(::getenv("MUDUO_USE_POLL")){return nullptr;}return new EpollPoller(loop);
}

EpollPoller

EpollPoller 是继承自 Poller 的具体事件分发器实现类。在我们的项目中将只实现 EpollPoller,而不涉及 PollPoller,这是因为 Muduo 默认使用 EpollPoller,并且相较于 pollepoll 的效率与性能表现更为优异。

epollIO多路复用的接口分别有:

  1. epoll_create
  2. epoll_ctl
    • EPOLL_CTL_ADD
    • EPOLL_CTL_MOD
    • EPOLL_CTL_DEL
  3. epoll_wait

EpollPoller对这几个系统调用进行了一系列封装,并针对 epoll_wait 函数的 struct epoll_event *events 参数采用了动态扩容策略。具体做法如下:

使用std::vector<struct epoll_event>作为events的承载容器,并初始分配16个元素的空间。当一次 epoll_wait 调用返回后,如果就绪事件的数量等于当前 vector 的容量,则将其容量扩充为原来的两倍,以应对后续可能出现的更多事件,从而避免频繁的内存分配操作。

EpollPoller代码

EpollPoller.h

class EpollPoller : public Poller
{
public:EpollPoller(EventLoop* loop);~EpollPoller() override;// 重写基类接口Timestamp poll(int timeout, ChannelList* activeChannels) override;void updateChannel(Channel*) override;void removeChannel(Channel*) override;private:void update(int operation, Channel* channel);void fillActiveChannels(int numEvents, ChannelList* activeChannel);private:using EventList = std::vector<struct epoll_event>;// Events初始化为16static const int kInitEventListSize = 16;int epollfd_;	// epoll_create返回的文件描述符// 作为epoll_wait中events的承载容器EventList events_;
};

EpollPoller.cc

/*表示Channel的状态
*/
const int kNew = -1;
const int kAdded = 1;
const int kDeleted = 2;EpollPoller::EpollPoller(EventLoop *loop):Poller(loop),// EPOLL_CLOEXEC:当进程执行exec时,自动关闭该文件描述符epollfd_(::epoll_create1(EPOLL_CLOEXEC)),events_(kInitEventListSize)
{if(epollfd_ < 0){LOG_ERROR("EPollPoller Create epollfd error\n");}
}EpollPoller::~EpollPoller()
{::close(epollfd_);
}/*
功能:向EventLoop提供的接口,监听哪些fd发生事件
参数:timeout(传入参数): 超时时间activeChannels(传出参数): 通过fillActiveChannels函数push所有发生事件的Channel
*/
Timestamp EpollPoller::poll(int timeout, ChannelList *activeChannels)
{//LOG_DEBUG("poll start");int numEvents = epoll_wait(epollfd_, &*events_.begin(), events_.size(), timeout);Timestamp now(Timestamp::now());int saveError = errno;if(numEvents > 0){LOG_DEBUG("%d Events happened", numEvents);fillActiveChannels(numEvents, activeChannels);if(numEvents == events_.size()){events_.resize(2 * events_.size());}}else if(numEvents == 0) {//LOG_DEBUG("%s timeout!", __FUNCTION__);}else{if(saveError != EINTR){errno = saveError;LOG_ERROR("EpollPoller::poll() err!");}}return now;
}/*
功能: 向EventLoop提供接口,修改Channel所注册的事件
*/
void EpollPoller::updateChannel(Channel* channel)
{const int index = channel->index(); LOG_INFO("EpollPoller::%s => fd=%d events=%d index=%d", __FUNCTION__, channel->fd(), channel->events(), channel->index());if(index == kNew || index == kDeleted){int fd = channel->fd();if(index == kNew){channels_[fd] = channel;}channel->set_index(kAdded);update(EPOLL_CTL_ADD, channel);}else{// update existing one with EPOLL_CTL_MOD/DELint fd = channel->fd();if(channel->isNonEvent()){update(EPOLL_CTL_DEL, channel);channel->set_index(kDeleted);}else{update(EPOLL_CTL_MOD, channel);}}
}/*
向EventLoop提供的接口,删除Channel
*/
void EpollPoller::removeChannel(Channel* channel)
{int fd = channel->fd();LOG_INFO("func=%s => fd=%d \n", __FUNCTION__, fd);channels_.erase(fd); int index = channel->index();if (index == kAdded){update(EPOLL_CTL_DEL, channel);}channel->set_index(kNew);
}
/*
功能:为Channel封装的文件描述符和Event注册进epoll的实施动作
参数:operation:1) EPOLL_CTL_ADD2) EPOLL_CTL_MOD3) EPOLL_CTL_DEL
*/
void EpollPoller::update(int operation, Channel *channel)
{struct epoll_event event;::bzero(&event, sizeof event);event.data.ptr = channel;event.events = channel->events();int fd = channel->fd();if(::epoll_ctl(epollfd_, operation, fd, &event) < 0){if(operation == EPOLL_CTL_DEL){LOG_ERROR("epollfd : %d, fd : %d op : %d, epoll_ctl error", epollfd_, fd, EPOLL_CTL_DEL);}else{LOG_FATAL("epollfd : %d, fd : %d op : %d, epoll_ctl error", epollfd_, fd, fd);}} 
}/*
功能:1.设置所有Channel的就绪事件Channel->revents2.向ChannelList中push发生事件的Channel 
*/
void EpollPoller::fillActiveChannels(int numEvents,ChannelList *activeChannel)
{for(int i = 0; i < numEvents; i++){Channel* channel = static_cast<Channel*>(events_[i].data.ptr);channel->set_revents(events_[i].events);activeChannel->push_back(channel);}
}

EventLoop

EventLoopMuduo 框架的核心组件之一,它主要负责在单线程模型下驱动整个事件循环系统正常运行,从而实现高效的事件处理和回调分发。MainLoopSubLoop运行在不同的线程下,符合one loop per thread模型,MainLoop负责接受新的连接(accept)和将客户端新连接封装好根据负载均衡算法指定SubLoop递送。故SubLoop负责后续客户端的读、写事件等。

EventLoop 类中有一个私有成员变量 (threadId_),用于缓存 EventLoop 所在的线程 ID,以确保线程安全性。这看似有些多余,因为我们通常遵循 “one loop per thread”(即一个线程只能运行一个 EventLoop)的设计理念。那么,为什么还要考虑线程安全问题呢?

原因在于,尽管每个 EventLoop 本身只在它所属的线程中运行,但在实际应用中会存在跨线程操作 EventLoop 的情况。例如,在 TcpServer 类中存在 MainLoopSubLoop 池。MainLoop 线程负责接收新连接,并通过轮询(负载均衡算法)将新连接分发给其他线程中的 SubLoop。这意味着 MainLoop 线程在运行中,需要访问其他线程中 EventLoop 的指针,进行相应的分发与调整。在多线程访问同一内存区域的情境下,线程同步问题便不可避免地出现。

为了解决此类问题,EventLoop 通过记录自身所在线程的 ID,来检查调用者是否在同一线程中执行操作。如果跨线程调用不合规,EventLoop 可以及时检测并采取相应措施(如断言失败或者通过安全的跨线程回调机制处理)。这样一来,即便保持 “one loop per thread” 的基本模型,也能在需要跨线程交互的场景中更好地控制与管理事件循环间的通信,从而确保系统的稳定性和安全性。

在实际运行中,当 SubLoop 没有事件需要处理时,它会一直阻塞在 epoll_wait 调用中等待事件。此时问题在于,MainLoop 如何向处于阻塞状态的 SubLoop 发出信号,让其知道有新客户端连接到来,需要 SubLoop 负责接收呢?

原作者的解决方案是:在 SubLoop 创建一个专门用于监听唤醒信号的文件描述符,这个文件描述符通过 eventfd 系统调用创建。SubLoopEventLoop 会持续监听该文件描述符上的可读事件。一旦有新的客户端连接建立,MainLoop 就会调用 EventLoop::wakeup 函数,向这个文件描述符中写入数据,触发可读事件,从而唤醒在 epoll_wait 上阻塞的 SubLoop。这样,SubLoop 就能够立即响应新连接的到来并进行处理。

wakeup 的作用仅限于唤醒 SubLoop(实际上只是在相关文件描述符中写入少量数据),除此之外并不会直接传递任何新客户端连接的信息。那么,MainLoop 又是如何将新客户端连接的信息传递给 SubLoop,并让 SubLoopPoller 为新连接的套接字进行注册呢?

答案是:在 MainLoop 中事先为 SubLoop 注册了一个回调函数。当 SubLoopwakeup 唤醒后,会调用 EventLoop::handleRead(),进而执行已经注册好的回调函数。若有多个此类回调函数,就需要借助一个队列对它们进行管理。

这个维护回调函数的队列正是 EventLoop::pendingFunctors_

EventLoop代码

EventLoop.h

class EventLoop : noncopyable
{
public:using Functor = std::function<void()>;EventLoop();~EventLoop();// 开启事件循环void loop();// 退出事件循环void quit();Timestamp pollReturnTime() const { return pollReturnTime_; }// 在当前loop中执行cbvoid runInLoop(Functor cb);void queueInLoop(Functor cb);// 唤醒loop所在的线程void wakeup();void updateChannel(Channel* channel);void removeChannel(Channel* channel);// 判断当前线程是否为创建EventLoop中的线程bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); }private:// WakeupFd的回调函数void handleRead();void doPendingFunctor();private:using ChannelList = std::vector<Channel*>;std::atomic_bool looping_;  // 是否在epoll_wait循环中std::atomic_bool quit_;     // 是否退出std::atomic_bool eventHanding_;// 是否在处理注册的回调函数std::atomic_bool callingPendingFunctors_; /* atomic */const pid_t threadId_;   // 用来标识此EventLoop的线程号Timestamp pollReturnTime_;// poller返回发生事件的时间点std::unique_ptr<Poller> poller_;// EventLoop绑定的poller// mainloop与subloop的通信方式, 在subloop中监听这个文件描述符,如果读事件就绪,代表mainloop派发了一个新的Channel int wakeupFd_;  // 监听wakeupFd的Channelstd::unique_ptr<Channel> wakeupChannel_;// epoll_wait中返回的活跃的Channel集合ChannelList activeChannels;std::mutex mutex_; // 保护pendingFunctors的线程安全std::vector<Functor> pendingFunctors_; // 存储此loop需要执行的所有回调函数
};

解析

  • 构造函数:
    SubLoop线程中,会在线程的(后面会讲)上构建一个EventLoop对象,从而调用构造函数。threadId_(CurrentThread::tid())其实就是为EventLoop对象绑定其所属线程ID。(CurrentThread 的具体实现将在后文讨论)。
  • loop()
    负责执行事件循环的主要逻辑,不断监听在poller_所注册的事件。若有事件发生,则执行该事件对应的回调函数,之后调用doPendingFunctor()方法。
  • runInLoop(Functor cb)
    该方法用于让 cb 回调函数在 EventLoop 所属的线程内执行。如果调用者不在其所属线程,则会通过 queueInLoopcb 注册到 pendingFunctors_ 队列中,以便后续在适当的时候被调用。
  • doPendingFunctor()
    该方法执行pendingFunctors_内的回调函数。作者用了一个很巧妙的代码思路:预先定义一个临时的 std::vector<Functor> 对象,并在临界区内使用 swap() 函数将 pendingFunctors_ 和临时对象进行交换。这样做的好处是,临界区内只需要执行 swap() 操作,而不需要逐一执行回调函数。
    按照正常逻辑,如果在临界区内直接执行 pendingFunctors_ 中的所有回调函数,会导致其他线程在尝试通过 queueInLoop() 注册新的回调时,必须等待所有回调函数执行完毕并且锁释放。这在多线程环境下会显著降低效率。通过交换操作,doPendingFunctor() 能够快速地将当前的回调函数队列转移到临时对象中,释放锁后再在临时对象上执行回调函数,从而大幅提升并发性能和整体效率。

EventLoop.cc

// 防止一个线程创建多个Eventloop
__thread EventLoop* t_loopInThisThread = nullptr;
// 定义默认的Poller IO复用接口的超时函数
const int kPoolTimeMs = 10000;int createEventfd()
{int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);if(evtfd< 0){LOG_FATAL("threadId : %d create eventfd error", CurrentThread::tid());}return evtfd;
}EventLoop::EventLoop() :looping_(false),quit_(false),eventHanding_(false),callingPendingFunctors_(false),threadId_(CurrentThread::tid()),poller_(Poller::newDefaultPoller(this)),wakeupFd_(createEventfd()),wakeupChannel_(new Channel(this, wakeupFd_)) 
{LOG_DEBUG("thread %d Eventloop created", CurrentThread::tid());if(t_loopInThisThread){LOG_FATAL("Another EventLoop %p exists in this thread %d.", t_loopInThisThread, threadId_);}else{t_loopInThisThread = this;}wakeupChannel_->setReadCallback(std::bind(&EventLoop::handleRead, this));wakeupChannel_->enableReading();
}EventLoop::~EventLoop()
{LOG_DEBUG("EventLoop %p of thread %d destructs in thread %d", this, threadId_, CurrentThread::tid());wakeupChannel_->disableAll();wakeupChannel_->remove();::close(wakeupFd_);t_loopInThisThread = nullptr;
}void EventLoop::loop()
{looping_ = true;quit_ = false;LOG_DEBUG("EventLoop %p start looping ", this);while(!quit_){activeChannels.clear();pollReturnTime_ = poller_->poll(kPoolTimeMs, &activeChannels);eventHanding_ = true;for(auto channel : activeChannels){// Poller监听哪些channel发生事件了,上报给Eventchannel->handleEvent(pollReturnTime_);}eventHanding_ = false;// 执行当前EventLoop事件循环需要处理的回调操作/**/doPendingFunctor();}LOG_DEBUG("EventLoop %p stop looping", this);looping_ = false;
}void EventLoop::quit()
{/*事件循环退出时机:1. 在自己的线程内调用2. 在其他线程内调用该函数2.1 如果EventLoop在阻塞中,wakeup可以唤醒进而退出循环*/quit_ = true;if(!isInLoopThread()){wakeup();}
}void EventLoop::runInLoop(Functor cb)
{/*让某个任务(回调函数cb)在当前线程内执行。主要用于线程安全和任务调度,确保任务的执行环境和EventLoop的线程一致*/if(isInLoopThread()){cb();}else{queueInLoop(std::move(cb));}
}void EventLoop::queueInLoop(Functor cb)
{/*将任务放入pendingFunctors:1. 外部线程调用queueInLoop -- !isInLoopThread2. 本线程正在执行回调函数 -- callingPendingFunctors_2.1 如果不添加这个判断条件,很有可能导致EventLoop一直阻塞*/{std::lock_guard<std::mutex> mutex(mutex_);pendingFunctors_.emplace_back(std::move(cb));}if (!isInLoopThread() || callingPendingFunctors_){wakeup();}
}void EventLoop::updateChannel(Channel *channel)
{poller_->updateChannel(channel);
}void EventLoop::removeChannel(Channel *channel)
{poller_->removeChannel(channel);
}void EventLoop::handleRead()
{u_int64_t one = 1;size_t n = ::read(wakeupFd_, &one, sizeof one);if(n != sizeof one){LOG_ERROR("EventLoop::handleRead() reads %ld bytes instead of 8", n);}
}void EventLoop::doPendingFunctor()
{std::vector<Functor> functors;callingPendingFunctors_ = true;{std::lock_guard<std::mutex> mutex(mutex_);functors.swap(pendingFunctors_);}for(const Functor& functor : functors){functor();}callingPendingFunctors_ = false;
}void EventLoop::wakeup()
{u_int64_t one = 1;size_t n = ::write(wakeupFd_, &one, sizeof one);if(n != sizeof one){LOG_ERROR("EventLoop::wakeup() writes %ld bytes instead of 8", n);}
}void EventLoop::handleRead()
{u_int64_t one = 1;size_t n = ::read(wakeupFd_, &one, sizeof one);if(n != sizeof one){LOG_ERROR("EventLoop::handleRead() reads %ld bytes instead of 8", n);}
}void EventLoop::doPendingFunctor()
{std::vector<Functor> functors;callingPendingFunctors_ = true;{std::lock_guard<std::mutex> mutex(mutex_);functors.swap(pendingFunctors_);}for(const Functor& functor : functors){functor();}callingPendingFunctors_ = false;
}void EventLoop::wakeup()
{u_int64_t one = 1;size_t n = ::write(wakeupFd_, &one, sizeof one);if(n != sizeof one){LOG_ERROR("EventLoop::wakeup() writes %ld bytes instead of 8", n);}
}

类图

image-20241210162647589

版权声明:

本网仅为发布的内容提供存储空间,不对发表、转载的内容提供任何形式的保证。凡本网注明“来源:XXX网络”的作品,均转载自其它媒体,著作权归作者所有,商业转载请联系作者获得授权,非商业转载请注明出处。

我们尊重并感谢每一位作者,均已注明文章来源和作者。如因作品内容、版权或其它问题,请及时与我们联系,联系邮箱:809451989@qq.com,投稿邮箱:809451989@qq.com