一、背景
(1)下面几个有关异步操作的例子:
a)客户端和服务端的异步关系,就是客户端发送请求后不需要等待结果,接下来发送其他请求。
b)对于服务端,客户端来请求后,服务端获得请求后去mySQl查询,然后在返回结果。在这里,处理请求、检测对应的io,读写数据和处理数据做到异步。
(2)以下4种都是同步的io,怎么理解?
read()读数据和返回数据是一起的,读数据就返回。读成功就把数据带回来,不成功返回的就是-1。读这个函数本身就是同步的。发起读请求和返回数据是合到一起的,io是同步的io。
write()发起写请求和把数据写进内核也是一起的。
recv()在发接收请求和接收数据也是一起的。
(3)异步io,是指比如原本同步的read()中读请求和数据返回没有做到一起,而是做成2个步骤。优点是使至少原本copy_to_user()的时间节省下来。
思路框架:某个操作为了做到异步,可以通过引入一个线程池/任务队列实现。第1步是把这个操作作为任务抛到任务队列里面。→ 第2步就是通过线程池(worker)从任务队列里面取任务执行。→ 第3步是把执行后的结果放入另一个队列里面。→ 第4步开启类似线程的东西(while循环)不断地从“complete queue”里面取结果就可以了。对外界而言就是,只是需要不断投放信息到"submit queue"就可以了。
参数介绍:
对于封装的a_read()有3个参数:fd,读数据放到哪个buffer里,还有这个buffer多长。打成task后放到submit queue里面,然后由另外的线程(worker)执行,执行出的结果放到complete queue里面,等待外部来取。
以上执行过程的2个问题:
a) 以上的操作避免不了频繁的把task任务copy到队列的过程,怎么处理?
答:我们可以把这块频繁分配的内存以mmaped的方式映射出来,就是不需要每次另外创建空间,而是调用定义的“read(fd, buffer, length)”这个函数通过 mmap 提前申请一块内存区域,将其直接映射到任务队列(submit queue和complete queue)所需的空间上,这样,队列的生产和消费操作不需要频繁分配和释放内存而是直接复用这块已映射的内存。
提前取出一块内存进行内存共享。
b) 队列怎么做到高效?对外界而言,要求就是做到线程安全,就是每个线程都可以往里面提交。而如果仅仅为了线程安全而采用频繁加锁的方式,效率肯定不高。
答:做成无锁的环形队列。比如submit queue设置出长度为100的环形队列。可以把提交的index对100取模(index%100)得到当前是把这个提交的任务存储到了第几个,因此构建成了一个环(虽然存储空间依然是连续的,但在执行的逻辑上面变成了1个环)。(传统队列通常需要锁来保护队列整体,因为插入操作和取出操作容易产生访问重叠的区域;而环形队列的插入操作(写入任务)和 取出操作(读取任务)互不干扰,同时不需要因为队列数量的变化导致重新分配存储空间)。
(4)io-uring的原理,内核在19年版本里为io-uring新增了3个系统调用。
先通过submit queue和complete queue把环境构建起来,然后在submit queue里一次一次的把事件setup,然后把事件register,当register所有事件后用enter把信息输入(push)进worker里处理。
a). io_uring_setup
b). io_uring_enter
c). io_uring_register
b). 把以上3个系统调用封装起来的库——liburing,一般就用这个库。
io-uring和liburing的关系,可以理解为io_uring提供裸接口,liburing是包装好的库。
(5)io-uring里的u是指user,指用户空间提供的ring。但此处都已经是系统调用了,为什么还是用户空间的ring?
答: register提供事件的时候,用户空间和内存空间是同一个内存,通过mmap方式建立。是针对用户空间对io操作的环形队列。
二、实现io_uring + tcpserver
io_uring能为开发做什么?
答:
2.1 io_uring安装
(1)linux内核版本最好大于5.4
(2)安装命令:
git clone https://github.com/axboe/liburing.git
cd liburing
make
sudo make instal
2.2 代码实现
2.2.1 流程
1)server初始化,到listen()为止
2)构建submit squeue,拿到submit squeue头指针
3)通过io_uring_prep_accept实现accept()请求的异步,向submit squeue增加节点。
4)while循环:
(a)通过io_uring_submit提交任务到worker;
(b)通过io_uring_wait_cqe拿到complete squeue头指针,然后看worker里有没有之前的结果
(c)通过io_uring_peek_batch_cqe把结果complete squeue里的结果带出来,功能类似epoll_wait。之后的for循环也是类似epoll_wait的返回值操作
(d)针对for循环的结果处理
2.2.2 第一次io_uring代码——构建出io_uring可以正常运行的代码框架
#include <stdio.h>
#include <liburing.h>
#include <netinet/in.h>
#include <string.h>
#include <unistd.h>int init_server(unsigned short port) { int sockfd = socket(AF_INET, SOCK_STREAM, 0); struct sockaddr_in serveraddr; memset(&serveraddr, 0, sizeof(struct sockaddr_in)); serveraddr.sin_family = AF_INET; serveraddr.sin_addr.s_addr = htonl(INADDR_ANY); serveraddr.sin_port = htons(port); if (-1 == bind(sockfd, (struct sockaddr*)&serveraddr, sizeof(struct sockaddr))) { perror("bind"); return -1; } listen(sockfd, 10);return sockfd;
}#define ENTRIES_LENGTH 1024
//#define BUFFER_LENGTH 1024int main(int argc, char *argv[]) {unsigned short port = 9999;int sockfd = init_server(port);// 创建一个uring参数struct io_uring_params params;memset(¶ms, 0, sizeof(params));// 创建一个uringstruct io_uring ring;
// 初始化一个uring,把submit queue和complete queue这两个队列构建起来(已执行了io_uring_setup)io_uring_queue_init_params(ENTRIES_LENGTH, &ring, ¶ms);
// 拿到submit queue环形队列的头指针(已执行了io_uring_setup)struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);#if 0struct sockaddr_in clientaddr; socklen_t len = sizeof(clientaddr);accept(sockfd, (struct sockaddr*)&clientaddr, &len);
#elsestruct sockaddr_in clientaddr; socklen_t len = sizeof(clientaddr);
// 向submit queue里增加1个节点(已执行了io_uring_register)io_uring_prep_accept(sqe, sockfd, (struct sockaddr*)&clientaddr, &len, 0);
// set_event_accept(&ring, sockfd, (struct sockaddr*)&clientaddr, &len, 0);#endifwhile(1){
// 把所有事件一起提交给worker(已执行了io_uring_enter)io_uring_submit(&ring);printf("Before");// complete queue查看有无woker完成的结果struct io_uring_cqe *cqe;// 拿到complete queue环形队列头指针io_uring_wait_cqe(&ring, &cqe);printf("After");struct io_uring_cqe *cqes[128];
// 取complete queue内部的节点,效果类似epoll_wait()int nready = io_uring_peek_batch_cqe(&ring, cqes, 128);int i = 0;for (i = 0; i < nready; i++){printf(" io_uring_peek_batch_cqe\n");// for循环的结果struct io_uring_cqe *entries = cqes[i];int clientfd = entries->res; printf("clientfd: %d\n", clientfd);}// 清空cqe里已经运行过的结果io_uring_cq_advance(&ring, nready);}}
2.2.2 第二次io_uring代码——可以持续进行连接请求
代码逻辑:首先初始化1个server,初始化1个ring,首次执行的时候通过设置set_event_accept往submit queue里增加1个节点,下面进入while循环,循环里在每次accept以后在事件处理完后清空,再次通过set_event_accept增加1个节点。
提问1:sq(submit sequeue)的entry(节点)与cq(complete sequeue)的entry有什么关系?
答:sq和cq共用一块内存,但是是同一块内存的两个独立区域,彼此互不干扰。
提问2:io_uring_cq_advance为什么没把sqe里设置的set_event_accept()内数据清空?
答:io_uring_cq_advance只清空complete sequeue的数据。
提问3:io_uring和epoll区别?
答:epoll在每次设置完后可以等待时间触发,不需要进一步修改就会持续有事件处理;而io_uring是每次处理完事件后需要再次设置后才能处理下一个事件。在设计思路上,就是reactor和proactor的区别。
#include <stdio.h>
#include <liburing.h>
#include <netinet/in.h>
#include <string.h>
#include <unistd.h>#define EVENT_ACCEPT 0
#define EVENT_READ 1
#define EVENT_WRITE 2struct conn_info{ // sizeof()是8,原因是int 是4,2个int是8int fd;int event;
};int init_server(unsigned short port) { int sockfd = socket(AF_INET, SOCK_STREAM, 0); struct sockaddr_in serveraddr; memset(&serveraddr, 0, sizeof(struct sockaddr_in)); serveraddr.sin_family = AF_INET; serveraddr.sin_addr.s_addr = htonl(INADDR_ANY); serveraddr.sin_port = htons(port); if (-1 == bind(sockfd, (struct sockaddr*)&serveraddr, sizeof(struct sockaddr))) { perror("bind"); return -1; } listen(sockfd, 10);return sockfd;
}#define ENTRIES_LENGTH 1024int set_event_accept(struct io_uring *ring, int fd, struct sockaddr *addr,socklen_t *addrlen, int flags){struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);struct conn_info accept_info = {.fd = fd, // .fd 明确指定成员并初始化它的值.event = EVENT_ACCEPT, // .event 明确指定成员并初始化它的值};io_uring_prep_accept(sqe, fd, (struct sockaddr*)addr, addrlen, flags);memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));
}int main(int argc, char *argv[]) {unsigned short port = 9999;int sockfd = init_server(port);struct io_uring_params params;memset(¶ms, 0, sizeof(params));struct io_uring ring;io_uring_queue_init_params(ENTRIES_LENGTH, &ring, ¶ms);struct sockaddr_in clientaddr; socklen_t len = sizeof(clientaddr);set_event_accept(&ring, sockfd, (struct sockaddr*)&clientaddr, &len, 0);while(1){io_uring_submit(&ring);printf("Before");struct io_uring_cqe *cqe;io_uring_wait_cqe(&ring, &cqe);printf("After");struct io_uring_cqe *cqes[128];int nready = io_uring_peek_batch_cqe(&ring, cqes, 128);int i = 0;for (i = 0; i < nready; i++){printf(" io_uring_peek_batch_cqe\n");struct io_uring_cqe *entries = cqes[i];struct conn_info result;memcpy(&result, &entries->user_data, sizeof(struct conn_info));if (result.event == EVENT_ACCEPT){set_event_accept(&ring, sockfd, (struct sockaddr*)&clientaddr, &len, 0);printf("set_event_accept\n");}}io_uring_cq_advance(&ring, nready);}}
2.2.3 第三次io_uring代码——可以收发数据
#include <stdio.h>
#include <liburing.h>
#include <netinet/in.h>
#include <string.h>
#include <unistd.h>
#include <unistd.h>#define EVENT_ACCEPT 0
#define EVENT_READ 1
#define EVENT_WRITE 2struct conn_info{ // sizeof()是8,原因是int 是4?2个int是8int fd;int event;
};int init_server(unsigned short port) { int sockfd = socket(AF_INET, SOCK_STREAM, 0); struct sockaddr_in serveraddr; memset(&serveraddr, 0, sizeof(struct sockaddr_in)); serveraddr.sin_family = AF_INET; serveraddr.sin_addr.s_addr = htonl(INADDR_ANY); serveraddr.sin_port = htons(port); if (-1 == bind(sockfd, (struct sockaddr*)&serveraddr, sizeof(struct sockaddr))) { perror("bind"); return -1; } listen(sockfd, 10);return sockfd;
}#define ENTRIES_LENGTH 1024
#define BUFFER_LENGTH 1024int set_event_accept(struct io_uring *ring, int sockfd, struct sockaddr *addr,socklen_t *addrlen, int flags){struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);struct conn_info accept_info = {.fd = sockfd, // .fd 明确指定成员并初始化它的值.event = EVENT_ACCEPT, // .event 明确指定成员并初始化它的值};io_uring_prep_accept(sqe, sockfd, (struct sockaddr*)addr, addrlen, flags);memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));
}int set_event_recv(struct io_uring *ring, int sockfd, void *buf, size_t len, int flags){struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);struct conn_info accept_info = {.fd = sockfd, // .fd 明确指定成员并初始化它的值.event = EVENT_READ, // .event 明确指定成员并初始化它的值};io_uring_prep_recv(sqe, sockfd, buf, len, flags);memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));
}int set_event_send(struct io_uring *ring, int sockfd, void *buf, size_t len, int flags){struct io_uring_sqe *sqe = io_uring_get_sqe(&ring);struct conn_info accept_info = {.fd = sockfd, // .fd 明确指定成员并初始化它的值.event = EVENT_WRITE, // .event 明确指定成员并初始化它的值};io_uring_prep_send(sqe, sockfd, buf, len, flags);memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));
}int main(int argc, char *argv[]) {unsigned short port = 9998;int sockfd = init_server(port);struct io_uring_params params;memset(¶ms, 0, sizeof(params));struct io_uring ring;io_uring_queue_init_params(ENTRIES_LENGTH, &ring, ¶ms);struct sockaddr_in clientaddr; socklen_t len = sizeof(clientaddr);set_event_accept(&ring, sockfd, (struct sockaddr*)&clientaddr, &len, 0);char buffer[BUFFER_LENGTH] = {0};while(1){io_uring_submit(&ring);printf("Before");struct io_uring_cqe *cqe;io_uring_wait_cqe(&ring, &cqe);printf("After");struct io_uring_cqe *cqes[128];int nready = io_uring_peek_batch_cqe(&ring, cqes, 128);int i = 0;for (i = 0; i < nready; i++){printf(" io_uring_peek_batch_cqe\n");struct io_uring_cqe *entries = cqes[i];struct conn_info result;memcpy(&result, &entries->user_data, sizeof(struct conn_info));if (result.event == EVENT_ACCEPT){set_event_accept(&ring, sockfd, (struct sockaddr*)&clientaddr, &len, 0);printf("set_event_accept\n");int connfd = entries->res;set_event_recv(&ring, connfd, buffer, BUFFER_LENGTH, 0);//收数据 } else if (result.event == EVENT_READ){int ret = entries->res;printf("set_event_recv ret: %d %s\n", ret, buffer);if (ret == 0){close(result.fd);} else if (ret > 0){//保证了连续发送set_event_send(&ring, result.fd, buffer, BUFFER_LENGTH, 0); } else if (result.event == EVENT_WRITE){int ret = entries->res; printf("set_event_send ret: %d %s\n", ret, buffer);set_event_send(&ring, result.fd, buffer, BUFFER_LENGTH, 0); }}io_uring_cq_advance(&ring, nready);}}
2.3 报错备注:
(1)通过“htop”看到有cup达到100%,很有可能是while()进入死循环。
(2)reactor和proactor的区别是什么?至少找出3点区别
答:reactor是一个事件对应一个动作,proactor是提交事件后结果直接给出来了。
五、面试:
(1)到现在即使学完了,却发现不知道怎么用,会有混沌的状态,有关面试的问题:
a)面试的时候,社招是投50份简历大概有4~5份面试机会,有1~2个offer。校招是投200份简历该带有15~20个面试机会,有1~2个offer。比如面试十几次没过,说明技术水平有问题。
b)面试时间。如果面试低于30分钟,面试成功的概率低于5%,面试时间低于45分钟,面试成功概率小于30%。
c)所以提高面试时间、面试结束要复盘(面了什么问题,你是怎么表述的)。
(2)如果UDP的开发如何做?说不清楚可以通过github的开源代码进行梳理,然后找老师讨论。总结到技术文章里。
(3)TCP和UDP的区别?
答:我们有5点:<1>第1个TCP基于连接的,UDP是基于数据报的。<2>第2点在分包和粘包的解决方案上是不一样的。TCP可以通过2种方式,1种方式是通过在TCP数据包的前面加上一个长度,另一种可以做分隔符。而UDF必须为每个包加上一个id值。<3>第3点在并发的方式上面也是不一样的,TCP可以直接通过epoll直接管理io就可以,而对于UDP我们需要通过模拟TCP的方式建立连接。<4>第4点在使用场景上面,UDF是做实时性比较强的方面,比如实时对战的游戏。还有就是在数据下载的使用场景下,UDP由于不带拥塞控制可以把整个网络耗尽加快下载速度,比如迅雷会员。<5>udp更偏向于短连接,TCP适合长连接。(解释:TCP基于连接,UDP是基于数据包。(a)TCP情况下server有一个连接、客户端有一个连接,所以连接是一对一的,服务端通过send(),客户端通过recv(fd);(b)TCP像水管一样连接,所以是有顺序的,就是先发的先收到。而UDP不一样,UDP没有顺序,需要自己为包定义一个id和明确确认机制,以此知道这是第几个包。(c)并发上的区别,UDP在做并发上面需要模拟TCP从而做到并发建立连接,这是为了更好的管理每个io。就是每个客户建立连接,然后通过其他的fd把这连接发出来,从而构建了与TCP不同的方式。(d)使用场景的区别,UDP的2个优势TCP没有的,第1是实时性,TCP为了保证顺序所以做了延迟确认,而UDP可以快速确认每个包。或者第2个是下载传输数据的情况,因为UDP是不带拥塞控制的,可以把整个网络耗尽加快下载速度,比如迅雷会员。(e)udp更偏向于短连接,特别是发起一次请求,比如发起DNS请求然后返回ip不需要建立连接只要发包收包回包就可以了。TCP适合长连接,就是建立连接一直存在。)
(4)TCP的分包和粘包的解决办法?
答:(a)基于TCP是有顺序的,把TCP数据包协议头的前面加几个字节当做长度。(b)做一个分隔符,比如在一个字节发完后加上"\r\n",对方在读的时候每次在读完后就判断有没有这个分隔符,从而做一个包的切割。而UDP在发送数据的时候需要我们在用户空间上自己为包定义一个id,而且要明确确认机制,以此知道这是第几个包。
(5)网络部分怎么学?
答:除了网卡驱动没有讲,从网络协议栈、posix api、应用层的多线程、多进程、客户端。(1)总结。(2)把代码调通。
六、 随想或总结
(1)mmp提前申请一块内容,用来避免频繁内存分配。
(2)环形队列以取模(index%环形长度)的方式建立,作用是(a) 通过头指针(head)和尾指针(tail)使 取出操作(读取任务)和 插入操作(写入任务)和互不干扰,提高线程安全;(b) 避免因队列大小变化导致的反复的内存分配和内存释放。