文章目录
- io_uring简介
- io_uring原理
- io_uring工作流程
- io_uring应用
- 测试io_uring和epoll
- 如果您对我的文章感兴趣,请关注,点赞,收藏,评论,感谢支持!!!
io_uring简介
说到高性能网络编程,我们常常能想到epoll,但是io_uring的出现效果又比epoll要更好了。
io_uring是一个Linux内核的异步I/O框架,它提供了高性能的异步I/O操作,io_uring的目标是通过减少系统调用和上下文切换的开销来提高I/O操作的性能。如果对于网络IO的类型不是很清晰可以参考这篇文章:理解一下5种IO模型、阻塞IO和非阻塞IO、同步IO和异步IO
epoll本身是一个阻塞IO,它主要解决进程/线程切换,进程/线程单连接问题,实现了一个进程/线程对多个端口的监听,同时是克服了select每次循环需要向内核传递数据造成性能开销大的问题。
io_uring采样异步IO,相比于epoll可以具有更好的系统吞吐能力和响应速度。
从性能的角度来说,io_uring的确更优于epoll,io_uring也更加适合高并发,低延迟的场景,但是其实很多中小型的项目使用epoll也可以满足。epoll在使用起来也更加的简单高效,io_uring使用起来会更加的复杂,并且io_uring 是从 Linux 5.1 开始引入的,许多服务器或生产环境中的系统内核版本较低,可能还未支持 io_uring。
io_uring原理
Submission Queue(SQ) 一整块连续的内存空间存储的环形队列。 用于存放将执行操作的数据。
Completion Queue(CQ) 一整块连续的内存空间存储的环形队列。 用于存放完成操作返回的结果。
Submission Queue Entry(SQE) 提交队列中的一项。
Completion Queue Entry(CQE) 完成队列中的一项。
SQ个队列中的保存的主要是指针或者编号(index),真正的IO请求,保存在一个基于数组结构的环形队列中
CQ和CQE也是类似的对应关系。
如果是epoll_wait检测到有数据send/read,需要从用户态切换到内核态,io_uring因为存在和内核有共享区域,减少了系统调用和上下文切换的开销。
io_uring工作流程
前面介绍了io_uring的原理,在开销这块要优于epoll,接下来我们介绍工作流程:
这里简化了部分内容,把从SQE和CQE获取内容的部分省略,因为已经封装起来了。
总结一下大致步骤:
第一步:应用程序通过向 io_uring 的 SQ 提交 I/O 操作。
第二步:SQ内核线程从 SQ 中读取 I/O 操作。
第三步:SQ内核线程发起 I/O 请求。
第四步:I/O 请求完成后,SQ内核线程会将 I/O 请求的结果写入到 io_uring 的 CQ 中。
第五步:应用程序可以通过从 CQ 中读取到 I/O 操作的结果。
注:SQ,CQ存的是元数据,并不是实际数据。
io_uring应用
在io_uring最主要的是下面三个函数:
io_uring_setupio_uring_enterio_uring_register
但是我们并不会直接去使用这几个函数,而是使用liburing提供的函数来实现功能,而liburing的函数并不止这几个。
entries 代表 queue depth。要创建的sqe的数量
struct io_uring_params {__u32 sq_entries; //SQ的长度__u32 cq_entries; //CQ的长度__u32 flags;__u32 sq_thread_cpu;__u32 sq_thread_idle;__u32 resv[5];struct io_sqring_offsets sq_off;struct io_cqring_offsets cq_off;
};
io_uring_params参数区分输入参数和输出参数// io_uring 结构体中包含需要使用到的 SQ和CQ ,以及需要关联的文件FD, 和相关的配置参数falgs;
struct io_uring {struct io_uring_sq sq;struct io_uring_cq cq;unsigned flags;int ring_fd;
};struct io_uring_sq {unsigned *khead;unsigned *ktail;unsigned *kring_mask;unsigned *kring_entries;unsigned *kflags;unsigned *kdropped;unsigned *array;struct io_uring_sqe *sqes;unsigned sqe_head;unsigned sqe_tail;size_t ring_sz;void *ring_ptr;....
};struct io_uring_cq {unsigned *khead;unsigned *ktail;unsigned *kring_mask;unsigned *kring_entries;unsigned *koverflow;struct io_uring_cqe *cqes;size_t ring_sz;void *ring_ptr;....
};struct io_uring_sqe {__u8 opcode; /* type of operation for this sqe */__u8 flags; /* IOSQE_ flags */__u16 ioprio; /* ioprio for the request */__s32 fd; /* file descriptor to do IO on */__u64 off; /* offset into file */__u64 addr; /* pointer to buffer or iovecs */__u32 len; /* buffer size or number of iovecs */union {__kernel_rwf_t rw_flags;__u32 fsync_flags;__u16 poll_events;__u32 sync_range_flags;__u32 msg_flags;};__u64 user_data; /* data to be passed back at completion time */union {__u16 buf_index; /* index into fixed buffers, if used */__u64 __pad2[3];};
};struct io_uring_cqe {__u64 user_data; /* sqe->user_data submission passed back */__s32 res; /* result code for this event */__u32 flags;
};
在liburing中,我们常常使用下面的函数来实现一些功能:
//用于初始化 io_uring 队列并允许用户通过 参数配置来定制行为
int io_uring_queue_init_params(unsigned entries,struct io_uring *ring,struct io_uring_params *params);// 用户初始化 io_uring。该方法中包含了内存空间的初始化以及mmap 调用,entries:队列深度
int io_uring_queue_init(unsigned entries, struct io_uring *ring, unsigned flags);// 为了提交IO请求,需要获取里面queue的一个空闲项
struct io_uring_sqe *io_uring_get_sqe(struct io_uring *ring);//用sockfd来开始接受套接字描述的连接请求,地址存在addr中
void io_uring_prep_accept(struct io_uring_sqe *sqe,int sockfd,struct sockaddr *addr,socklen_t *addrlen,int flags);//返回CQE的数量,相当于epoll_wait
unsigned io_uring_peek_batch_cqe(struct io_uring *ring, struct io_uring_cqe **cqes, unsigned count);//接收数据
void io_uring_prep_recv(struct io_uring_sqe *sqe,int sockfd,void *buf,size_t len,int flags);//发送数据
void io_uring_prep_send(struct io_uring_sqe *sqe,int sockfd,const void *buf,size_t len,int flags);
//将内部完成队列 (CQ) 的读指针向前推进 nr 个条目,表示用户空间已经处理了这些完成事件。
void io_uring_cq_advance(struct io_uring *ring,unsigned nr);// 非系统调用,准备阶段,和libaio封装的io_prep_writev一样
void io_uring_prep_writev(struct io_uring_sqe *sqe, int fd,const struct iovec *iovecs, unsigned nr_vecs, off_t offset)// 非系统调用,准备阶段,和libaio封装的io_prep_readv一样
void io_uring_prep_readv(struct io_uring_sqe *sqe, int fd, const struct iovec *iovecs, unsigned nr_vecs, off_t offset)// 提交sq的entry,不会阻塞等到其完成,内核在其完成后会自动将sqe的偏移信息加入到cq,在提交时需要加锁
int io_uring_submit(struct io_uring *ring);// 提交sq的entry,阻塞等到其完成,在提交时需要加锁。
int io_uring_submit_and_wait(struct io_uring *ring, unsigned wait_nr);//等待cqe
int io_uring_wait_cqe(struct io_uring *ring,struct io_uring_cqe **cqe_ptr);// 非系统调用 遍历时,可以获取cqe的data
void *io_uring_cqe_get_data(const struct io_uring_cqe *cqe)// 清理io_uring
void io_uring_queue_exit(struct io_uring *ring);
接下来的代码,我们来实现一个由io_uring写的服务器
#include <stdio.h>
#include <liburing.h>
#include <netinet/in.h>
#include <string.h>
#include <unistd.h>#define EVENT_ACCEPT 0
#define EVENT_READ 1
#define EVENT_WRITE 2//user_data 字段是用户定义的值,用于标识或携带与此提交请求相关的信息。
//一个 64 位的用户态数据,内核不会修改,直接从提交到完成传递。
struct conn_info {int fd;int event;
};int init_server(unsigned short port) { int sockfd = socket(AF_INET, SOCK_STREAM, 0); struct sockaddr_in serveraddr; memset(&serveraddr, 0, sizeof(struct sockaddr_in)); serveraddr.sin_family = AF_INET; serveraddr.sin_addr.s_addr = htonl(INADDR_ANY); serveraddr.sin_port = htons(port); if (-1 == bind(sockfd, (struct sockaddr*)&serveraddr, sizeof(struct sockaddr))) { perror("bind"); return -1; } listen(sockfd, 10);return sockfd;
}#define ENTRIES_LENGTH 1024
#define BUFFER_LENGTH 1024int set_event_recv(struct io_uring *ring, int sockfd,void *buf, size_t len, int flags) {struct io_uring_sqe *sqe = io_uring_get_sqe(ring);struct conn_info accept_info = {.fd = sockfd,.event = EVENT_READ,};io_uring_prep_recv(sqe, sockfd, buf, len, flags);memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));}int set_event_send(struct io_uring *ring, int sockfd,void *buf, size_t len, int flags) {struct io_uring_sqe *sqe = io_uring_get_sqe(ring);struct conn_info accept_info = {.fd = sockfd,.event = EVENT_WRITE,};io_uring_prep_send(sqe, sockfd, buf, len, flags);memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));}//ACCEPT 是一次性操作
int set_event_accept(struct io_uring *ring, int sockfd, struct sockaddr *addr,socklen_t *addrlen, int flags) {//获取sqe空闲项//获取SQE的时机,每一次I/O都需要申请,当准备发数据时申请,当准备读数据时申请struct io_uring_sqe* sqe = io_uring_get_sqe(ring);//设置user_data的值struct conn_info accept_info = {.fd = sockfd,.event = EVENT_ACCEPT,};//acceptio_uring_prep_accept(sqe, sockfd, (struct sockaddr*)addr, addrlen, flags);memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));}int main(int argc, char *argv[]) {unsigned short port = 9999;int sockfd = init_server(port);struct io_uring_params params;memset(¶ms, 0, sizeof(params));struct io_uring ring;io_uring_queue_init_params(ENTRIES_LENGTH, &ring, ¶ms);struct sockaddr_in clientaddr; socklen_t len = sizeof(clientaddr);//第一次acceptset_event_accept(&ring, sockfd, (struct sockaddr*)&clientaddr, &len, 0);char buffer[BUFFER_LENGTH] = {0};while (1) {// 提交sq的entry,不会阻塞等到其完成io_uring_submit(&ring); //runstruct io_uring_cqe* cqe;//阻塞等待至少一个事件io_uring_wait_cqe(&ring, &cqe); //如果为空,等待一个cqestruct io_uring_cqe* cqes[128];//检查所有可用事件int nready = io_uring_peek_batch_cqe(&ring, cqes, 128); // epoll_waitint i = 0;for (i = 0;i < nready;i ++) {struct io_uring_cqe *entries = cqes[i];struct conn_info result;memcpy(&result, &entries->user_data, sizeof(struct conn_info));if (result.event == EVENT_ACCEPT) {//重新提交新的 ACCEPT 操作,监听下一个连接。set_event_accept(&ring, sockfd, (struct sockaddr*)&clientaddr, &len, 0);//printf("set_event_accept\n"); ////内核处理结果放在res中,成功:res 是新连接的文件描述符;失败:res 是负值int connfd = entries->res;set_event_recv(&ring, connfd, buffer, BUFFER_LENGTH, 0);} else if (result.event == EVENT_READ) { ////对于read来说这个值是读取字节数int ret = entries->res;//printf("set_event_recv ret: %d, %s\n", ret, buffer); //if (ret == 0) {close(result.fd);} else if (ret > 0) {set_event_send(&ring, result.fd, buffer, ret, 0);}} else if (result.event == EVENT_WRITE) {////对于write来说是发送字节数int ret = entries->res;//printf("set_event_send ret: %d, %s\n", ret, buffer);set_event_recv(&ring, result.fd, buffer, BUFFER_LENGTH, 0);}}io_uring_cq_advance(&ring, nready);}}
相比较于epoll,io_uring的代码还是会更难以理解一些。大致步骤如下:
1,初始化服务器,返回服务器的sockfd,并且监听连接;
2,创建SQ,CQ和初始化params;
3,如果有连接,则accept;申请一个SQE(在io_uring中想要IO操作都得申请SQE),设置use_data的值(user_data固定8字节,由SQE创建,CQE不改变值)
4,
接收连接请求(这里需要申请SQE,一方面是涉及IO都需要申请,另一方面这里申请的SQE可以用于下一轮的提交SQ操作)
while(1)
{
提交SQ
等待CQ中至少有一个事件
检查CQ中存在多少事件for(; ; ) //遍历每一个CQE{if accept因为accept是一次性操作,设置下一次监听的连接请求的接收获取clientfd准备接收数据(这里需要申请SQE,一方面是涉及IO都需要申请,另一方面这里申请的SQE可以用于下一轮的提交SQ操作)else if read读取字节数发送数据else if write发送字节数读取数据}更新CQ
}
测试io_uring和epoll
从理论上来讲io_uring的性能是要优于epoll的,那么我们就来测试一下在百万连接的情况下,各自的性能如何
下面是测试用的代码
#include <stdio.h>
#include <string.h>
#include <sys/socket.h>
#include <stdlib.h>
#include <unistd.h>#include <sys/time.h>
#include <pthread.h>
#include <arpa/inet.h>#include <unistd.h>
#include <getopt.h>typedef struct test_context_s
{char serverip[16];int port;int threadnum;int connection;int requestion;int failed;
} test_context_t;int connect_tcpserver(const char *ip, unsigned short port)
{int connfd = socket(AF_INET, SOCK_STREAM, 0);struct sockaddr_in tcpserver_addr;memset(&tcpserver_addr, 0, sizeof(struct sockaddr_in));tcpserver_addr.sin_family = AF_INET;tcpserver_addr.sin_addr.s_addr = inet_addr(ip);tcpserver_addr.sin_port = htons(port);int ret = connect(connfd, (struct sockaddr *)&tcpserver_addr, sizeof(struct sockaddr_in));if (ret){perror("connect");return -1;}return connfd;
}
#define TIME_SUB_MS(tv1, tv2) ((tv1.tv_sec - tv2.tv_sec) * 1000 + (tv1.tv_usec - tv2.tv_usec) / 1000)#define TEST_MESSAGE "ABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890abcdefghijklmnopqrstuvwxyz\r\n"
#define RBUFFER_LENGTH 2048#define WBUFFER_LENGTH 2048int send_recv_tcppkt(int fd)
{char wbuffer[WBUFFER_LENGTH] = {0};int i = 0;for (i = 0; i < 8; i++){strcpy(wbuffer + i * strlen(TEST_MESSAGE), TEST_MESSAGE);}int res = send(fd, wbuffer, strlen(wbuffer), 0);if (res < 0){exit(1);}char rbuffer[RBUFFER_LENGTH] = {0};res = recv(fd, rbuffer, RBUFFER_LENGTH, 0);if (res <= 0){exit(1);}if (strcmp(rbuffer, wbuffer) != 0){printf("failed: '%s' != '%s'\n", rbuffer, wbuffer);return -1;}return 0;
}static void *test_qps_entry(void *arg)
{test_context_t *pctx = (test_context_t *)arg;int connfd = connect_tcpserver(pctx->serverip, pctx->port);if (connfd < 0){printf("connect_tcpserver failed\n");return NULL;}int count = pctx->requestion / pctx->threadnum;int i = 0;int res;while (i++ < count){res = send_recv_tcppkt(connfd);if (res != 0){printf("send_recv_tcppkt failed\n");pctx->failed++; //continue;}}return NULL;
}int main(int argc, char *argv[])
{test_context_t ctx = {0};int opt;while ((opt = getopt(argc, argv, "s:p:t:c:n:?")) != -1){switch (opt){case 's':printf("-s: %s\n", optarg);strcpy(ctx.serverip, optarg);break;case 'p':printf("-p: %s\n", optarg);ctx.port = atoi(optarg);break;case 't':printf("-t: %s\n", optarg);ctx.threadnum = atoi(optarg);break;case 'c':printf("-c: %s\n", optarg);ctx.connection = atoi(optarg);break;case 'n':printf("-n: %s\n", optarg);ctx.requestion = atoi(optarg);break;default:return -1;}}pthread_t *ptid = malloc(ctx.threadnum * sizeof(pthread_t));int i = 0;struct timeval tv_begin;gettimeofday(&tv_begin, NULL);for (i = 0; i < ctx.threadnum; i++){pthread_create(&ptid[i], NULL, test_qps_entry, &ctx);}for (i = 0; i < ctx.threadnum; i++){pthread_join(ptid[i], NULL);}struct timeval tv_end;gettimeofday(&tv_end, NULL);int time_used = TIME_SUB_MS(tv_end, tv_begin);printf("success: %d, failed: %d, time_used: %d, qps: %d\n", ctx.requestion - ctx.failed,ctx.failed, time_used, ctx.requestion * 1000 / time_used);free(ptid);return 0;
}
这是使用io_uring进行客户端百万连接并且发送和接收数据的情况
这是epoll在百万连接并发送数据和接收数据的情况
从结果可以看出,io_uring的确在性能上要更优。
如果您对我的文章感兴趣,请关注,点赞,收藏,评论,感谢支持!!!
您的支持是我不断创作的源动力。