云计算百科
云计算领域专业知识百科平台

Linux高性能异步io,io_uring实现服务器

Linux系统中io_uring是2019年首次出现的一个新的异步IO框架,用于实现高效的异步IO操作。

1. io_uring为什么高效
  • 相较于常用的同步io,read/write/accpet等,io_uring使用两个队列来实现io操作,一个是提交队列Submission Queue(SQ),另一个是完成队列Completion Queue(CQ),SQ存放用户提交的io请求,CQ存放内核io请求完成后的结果,发起请求后不需要阻塞等待io操作的结果,实现真正的异步。
  • 使用两个队列会带来两个问题,一个是任务用户空间和两个队列之间任务的频繁拷贝带来的性能损耗,另一个是线程安全问题。
    • 性能问题:io_uring创建时会使用mmap做内存映射,用户空间和内核空间共享一块内存;
    • 线程安全:io_uring使用环形队列,并且队列都使用原子型的头尾指针来管理队列中的条目;
  • io_uring 支持批量提交 I/O 请求和批量获取完成结果,这减少了上下文切换和系统调用的开销,提高了效率。
2. io_uring如何使用
  • Linux内核版本需要大于5.4,以更好地支持io_uring

  • io_uring的三个系统调用

    int io_uring_setup(u32 entries, struct io_uring_params *p);

    • 初始化一个 io_uring 实例。它在内核中创建和配置 io_uring 数据结构,并返回一个文件描述符,用于与该实例进行交互。

    int io_uring_register(unsigned int fd, unsigned int opcode, void *arg, unsigned int nr_args);

    • 用于向 io_uring 实例注册各种资源,如文件描述符、缓冲区、事件文件描述符等。这些注册操作可以提高 I/O 操作的效率,因为它们允许 io_uring 在执行 I/O 操作时直接访问这些预先注册的资源,而不需要在每次操作时重新设置。

    int io_uring_enter(unsigned int fd, unsigned int to_submit, unsigned int min_complete, unsigned int flags, sigset_t *sig);

    • 用于提交和等待 io_uring 队列中的 I/O 操作。这个系统调用允许用户提交新的 I/O 请求,并等待这些请求的完成,或者仅提交请求而不等待。
  • 原始系统调用比较抽象,可以借助liburing来使用异步io,在Linux 5.4以上使用以下命令进行安装,低版本可能可以安装,但是不能正常运行。

    git clone https://github.com/axboe/liburing.git cd liburing ./configure make sudo make install

3. 使用liburing常用API构建异步服务器

int io_uring_queue_init_params(unsigned entries, struct io_uring *ring,struct io_uring_params *p);

  • 初始化一个io_uring实例,并指定SQ和CQ中的条目数,传递io_uring的配置参数,该函数使用了系统调用io_uring_setup。

struct io_uring_sqe *io_uring_get_sqe(struct io_uring *ring)

  • 获取SQ中第一个空io_uring_sqe结构体,后续用户可以填充io请求并提交给io_uring。

io_uring_prep_accept(struct io_uring_sqe *sqe, int fd, struct sockaddr *addr, socklen_t *addrlen, int flags)

  • 准备一个 accept 系统调用的提交队列条目,sqe是指向要填充的 io_uring_sqe 结构体的指针,后续参数同accept4,非阻塞调用,只提交请求,不等待返回结果,对于同步的send/recv也有对应的io_uring_prep_send/io_uring_prep_recv,底层使用io_uring_register系统调用。

int io_uring_submit(struct io_uring *ring);

  • 将提交队列条目(io_uring_sqe)一次性提交到内核进行处理。这些请求会被内核调度并执行,执行完成后,结果会被放入CQ中,底层使用io_uring_enter系统调用。

int io_uring_wait_cqe(struct io_uring *ring, struct io_uring_cqe **cqe_ptr)

  • 阻塞等待CQ中的条目,结果存入cqe_ptr。

unsigned io_uring_peek_batch_cqe(struct io_uring *ring,struct io_uring_cqe **cqes, unsigned count);

  • 批量获取CQ中的条目,支持一次性返回多个,可以类比epoll_wait,非阻塞。

void io_uring_cq_advance(struct io_uring *ring, unsigned nr)

  • 通知内核有多少个CQ中的条目已经被处理完成,可以被io_ring重用,在每次调用io_uring_peek_batch_cqe后,必须调用io_uring_cq_advance。

以下是完整实现代码:

#include <stdio.h>
#include <liburing.h>
#include <netinet/in.h>
#include <string.h>
#include <unistd.h>

#define EVENT_ACCEPT 0
#define EVENT_READ1
#define EVENT_WRITE2

#define ENTRIES_LENGTH1024
#define BUFFER_LENGTH1024

struct conn_info {
int fd;
int event;
};

int init_server(unsigned short port) {

int sockfd = socket(AF_INET, SOCK_STREAM, 0);
struct sockaddr_in serveraddr;
memset(&serveraddr, 0, sizeof(struct sockaddr_in));
serveraddr.sin_family = AF_INET;
serveraddr.sin_addr.s_addr = htonl(INADDR_ANY);
serveraddr.sin_port = htons(port);

if (1 == bind(sockfd, (struct sockaddr*)&serveraddr, sizeof(struct sockaddr))) {
perror("bind");
return 1;
}

listen(sockfd, 10);

return sockfd;
}

int set_event_recv(struct io_uring *ring, int sockfd, void *buf, size_t len, int flags) {

struct io_uring_sqe *sqe = io_uring_get_sqe(ring);

struct conn_info accept_info = {
.fd = sockfd,
.event = EVENT_READ,
};

io_uring_prep_recv(sqe, sockfd, buf, len, flags);
memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));
}

int set_event_send(struct io_uring *ring, int sockfd, void *buf, size_t len, int flags) {

struct io_uring_sqe *sqe = io_uring_get_sqe(ring);

struct conn_info accept_info = {
.fd = sockfd,
.event = EVENT_WRITE,
};

io_uring_prep_send(sqe, sockfd, buf, len, flags);
memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));
}

int set_event_accept(struct io_uring *ring, int sockfd, struct sockaddr *addr, socklen_t *addrlen, int flags) {

struct io_uring_sqe *sqe = io_uring_get_sqe(ring);

struct conn_info accept_info = {
.fd = sockfd,
.event = EVENT_ACCEPT,
};

io_uring_prep_accept(sqe, sockfd, (struct sockaddr*)addr, addrlen, flags);
memcpy(&sqe->user_data, &accept_info, sizeof(struct conn_info));
}

int main(int argc, char *argv[]) {

unsigned short port = 9999;
int sockfd = init_server(port);

struct io_uring_params params;
memset(&params, 0, sizeof(params));

struct io_uring ring;
io_uring_queue_init_params(ENTRIES_LENGTH, &ring, &params);

struct sockaddr_in clientaddr;
socklen_t len = sizeof(clientaddr);
set_event_accept(&ring, sockfd, (struct sockaddr*)&clientaddr, &len, 0);

char buffer[BUFFER_LENGTH] = {0};

while (1) {

io_uring_submit(&ring);

struct io_uring_cqe *cqe;
io_uring_wait_cqe(&ring, &cqe);

struct io_uring_cqe *cqes[128];
int nready = io_uring_peek_batch_cqe(&ring, cqes, 128); // epoll_wait

int i = 0;
for (i = 0;i < nready;i ++) {

struct io_uring_cqe *entries = cqes[i];
struct conn_info result;
memcpy(&result, &entries->user_data, sizeof(struct conn_info));

if (result.event == EVENT_ACCEPT) {

set_event_accept(&ring, sockfd, (struct sockaddr*)&clientaddr, &len, 0);
int connfd = entries->res;
set_event_recv(&ring, connfd, buffer, BUFFER_LENGTH, 0);
} else if (result.event == EVENT_READ) {

int ret = entries->res;
if (ret == 0) {
close(result.fd);
} else if (ret > 0) {
set_event_send(&ring, result.fd, buffer, ret, 0);
}
} else if (result.event == EVENT_WRITE) {

int ret = entries->res;
set_event_recv(&ring, result.fd, buffer, BUFFER_LENGTH, 0);
}

}

io_uring_cq_advance(&ring, nready);
}
}

4. io_uring机制对比epoll机制
  • epoll通过epoll_ctl增加关注的事件之后,只要不删除,下一次执行epoll_wait时fd发生事件仍然能够返回,但是io_uring注册事件并返回事件后,如果需要继续关注事件,则需要重新注册;
  • epoll_wait返回事件时,是检测到io事件发生,用户层仍然需要对io进行具体的操作,是reactor模式;io_uring的CQ返回时,io操作已经完成,是proactor模式;
  • io_uring可以一次性提交多个io请求,减少系统调用次数;

最后,推荐一个Linux c/c++内容学习平台

https://github.com/0voice

赞(0)
未经允许不得转载:网硕互联帮助中心 » Linux高性能异步io,io_uring实现服务器
分享到: 更多 (0)

评论 抢沙发

评论前必须登录!