结构
初始化
brpc的io事件分发器,使用多线程Reactor模式
通过InitializeGlobalDispatchers
来初始化全局io事件分发器
分为task_group_ntags
组,每组有event_dispatcher_num
void InitializeGlobalDispatchers() {g_edisp = new EventDispatcher[FLAGS_task_group_ntags * FLAGS_event_dispatcher_num];for (int i = 0; i < FLAGS_task_group_ntags; ++i) {for (int j = 0; j < FLAGS_event_dispatcher_num; ++j) {bthread_attr_t attr =FLAGS_usercode_in_pthread ? BTHREAD_ATTR_PTHREAD : BTHREAD_ATTR_NORMAL;attr.tag = (BTHREAD_TAG_DEFAULT + i) % FLAGS_task_group_ntags;CHECK_EQ(0, g_edisp[i * FLAGS_event_dispatcher_num + j].Start(&attr));}}// This atexit is will be run before g_task_control.stop() because above// Start() initializes g_task_control by creating bthread (to run epoll/kqueue).CHECK_EQ(0, atexit(StopAndJoinGlobalDispatchers));
}
分发策略
EventDispatcher& GetGlobalEventDispatcher(int fd, bthread_tag_t tag) {pthread_once(&g_edisp_once, InitializeGlobalDispatchers);if (FLAGS_task_group_ntags == 1 && FLAGS_event_dispatcher_num == 1) {return g_edisp[0];}int index = butil::fmix32(fd) % FLAGS_event_dispatcher_num;return g_edisp[tag * FLAGS_event_dispatcher_num + index];
}
读事件
对于acceptor,读事件处理函数为OnNewConnections
options.on_edge_triggered_events = OnNewConnections;
连接后新socket的读事件处理函数为OnNewDataFromTcp
或者OnNewMessages
#if BRPC_WITH_RDMAif (am->_use_rdma) {options.on_edge_triggered_events = rdma::RdmaEndpoint::OnNewDataFromTcp;} else {
#else{
#endifoptions.on_edge_triggered_events = InputMessenger::OnNewMessages;}
写事件
对于非阻塞connect时,会调用事件分发器的RegisterEvent
注册EPOLLOUT
int Socket::Connect(const timespec* abstime,int (*on_connect)(int, int, void*), void* data) {if (_ssl_ctx) {_ssl_state = SSL_CONNECTING;} else {_ssl_state = SSL_OFF;}struct sockaddr_storage serv_addr;socklen_t addr_size = 0;if (butil::endpoint2sockaddr(remote_side(), &serv_addr, &addr_size) != 0) {PLOG(ERROR) << "Fail to get sockaddr";return -1;}butil::fd_guard sockfd(socket(serv_addr.ss_family, SOCK_STREAM, 0));if (sockfd < 0) {PLOG(ERROR) << "Fail to create socket";return -1;}CHECK_EQ(0, butil::make_close_on_exec(sockfd));// We need to do async connect (to manage the timeout by ourselves).CHECK_EQ(0, butil::make_non_blocking(sockfd));const int rc = ::connect(sockfd, (struct sockaddr*)&serv_addr, addr_size);if (rc != 0 && errno != EINPROGRESS) {PLOG(WARNING) << "Fail to connect to " << remote_side();return -1;}if (on_connect) {EpollOutRequest* req = new(std::nothrow) EpollOutRequest;if (req == NULL) {LOG(FATAL) << "Fail to new EpollOutRequest";return -1;}req->fd = sockfd;req->timer_id = 0;req->on_epollout_event = on_connect;req->data = data;// A temporary Socket to hold `EpollOutRequest', which will// be added into epoll device soonSocketId connect_id;SocketOptions options;options.bthread_tag = _bthread_tag;options.user = req;if (Socket::Create(options, &connect_id) != 0) {LOG(FATAL) << "Fail to create Socket";delete req;return -1;}// From now on, ownership of `req' has been transferred to// `connect_id'. We hold an additional reference here to// ensure `req' to be valid in this scopeSocketUniquePtr s;CHECK_EQ(0, Socket::Address(connect_id, &s));// Add `sockfd' into epoll so that `HandleEpollOutRequest' will// be called with `req' when epoll event reachesif (GetGlobalEventDispatcher(sockfd, _bthread_tag).RegisterEvent(connect_id, sockfd, false) !=0) {const int saved_errno = errno;PLOG(WARNING) << "Fail to add fd=" << sockfd << " into epoll";s->SetFailed(saved_errno, "Fail to add fd=%d into epoll: %s",(int)sockfd, berror(saved_errno));return -1;}// Register a timer for EpollOutRequest. Note that the timeout// callback has no race with the one above as both of them try// to `SetFailed' `connect_id' while only one of them can succeed// It also work when `HandleEpollOutRequest' has already been// called before adding the timer since it will be removed// inside destructor of `EpollOutRequest' after leaving this scopeif (abstime) {int rc = bthread_timer_add(&req->timer_id, *abstime,HandleEpollOutTimeout,(void*)connect_id);if (rc) {LOG(ERROR) << "Fail to add timer: " << berror(rc);s->SetFailed(rc, "Fail to add timer: %s", berror(rc));return -1;}}} else {if (WaitEpollOut(sockfd, false, abstime) != 0) {PLOG(WARNING) << "Fail to wait EPOLLOUT of fd=" << sockfd;return -1;}if (CheckConnected(sockfd) != 0) {return -1;}}return sockfd.release();
}