深入理解Linux的五种IO模型 您所在的位置:网站首页 典型事件法的优缺点 深入理解Linux的五种IO模型

深入理解Linux的五种IO模型

2023-03-26 02:26| 来源: 网络整理| 查看: 265

Linux系统为我们提供了五种可用的IO模型,分别是阻塞式IO、非阻塞式IO、IO多路复用、信号驱动式IO和异步IO。这些模型的作用是让应用程序能够更好地管理和处理输入输出操作。

阻塞式IO:简单易用,但效率不高。非阻塞式IO:可以让应用程序在等待数据准备完成的过程中执行其他操作,但需要应用程序不断轮询内核缓冲区。IO多路复用:可以同时监控多个文件描述符,提高了应用程序对输入输出操作的管理能力。信号驱动式IO:可以让应用程序在数据准备好时接收到通知,避免了轮询。异步IO:可以让应用程序在发起异步读写操作后立即返回,并不等待操作完成。

每种模型都有其优缺点,具体使用哪种模型取决于应用场景。

Linux的I/O模型可以运用到所有的文件读写。在Linux中,一切设备皆文件,我们可以使用系统调用中的I/O函数(如open()、close()、write()、read()等)对文件进行相应的操作。但是下面是以网络作为例子。

网络请求过程

当一个网络请求到达操作系统时,涉及到多个步骤,下面是一个典型的流程:

网络请求被网卡接收:当一个网络请求到达操作系统时,它首先被网卡接收。网卡会将请求包转发给操作系统内核进行处理。内核处理网络请求:操作系统内核会对网络请求进行处理,包括验证网络协议、校验和、解析IP地址和端口号等。内核将请求包交给合适的套接字:一旦内核验证了网络请求,它会将请求包交给操作系统内部的套接字。套接字是操作系统提供的一种抽象接口,它允许用户进程通过网络进行通信。用户进程获取请求:当请求包被套接字接收时,它会触发一个中断,操作系统内核会将请求包中的数据复制到用户进程的内存中。这个过程是通过系统调用实现的。用户进程处理请求:一旦用户进程接收到请求,它可以进行相应的处理,比如解析请求包中的内容、处理请求、生成响应等。在处理请求的过程中,用户进程可以读取和写入套接字。用户进程响应请求:一旦用户进程处理完请求,它会生成一个响应,并将响应写入套接字。响应包将由操作系统内核转发回客户端。内核处理响应包:当响应包被套接字接收时,它会触发一个中断。操作系统内核会将响应包中的数据复制到内核缓冲区中,并开始处理响应。内核会根据响应包中的地址和端口号将响应转发给正确的客户端。

当一个网络请求到达操作系统时,它需要经过多个步骤才能到达用户进程,并最终生成响应。操作系统内核负责处理网络请求、管理套接字和转发响应,而用户进程负责处理请求和生成响应。

什么是网络监听

在上面中有一个步骤是网络请求到达网卡和内核将网卡的网络数据封装到对应的套接字中。用户线程并不知道网卡有数据到达操作系统的套接字(因为对应的套接字在系统内核空间中),所以需要进行网络监听,及向操作系统询问对应的套节字是否有数据到达,所以监听的过程实际上是向操作系统询问是否有数据到达内核空间的套接字。

什么是读取套接字

网络监听只是知道了有数据到达系统内核空间的套接字,但是对应的数据并不在用户空间中,所以,用户空间需要调用系统调用,将操作系统内核空间的套接字数据复制到用户空间,这个过程也需要花费时间。

I/O模型的优化

Linux的I/O模型是对用户进程从套接字读取和写入数据的步骤进行优化。在一个典型的网络应用中,数据通常通过套接字进行传输,而套接字是一个系统调用接口,它允许用户进程从操作系统内核中读取和写入数据。因此,套接字通信的效率直接影响到整个应用的性能。

不同的I/O模型提供了不同的优化策略,比如阻塞I/O模型会阻塞用户进程直到数据传输完成,非阻塞I/O模型会通过轮询的方式实现数据传输,I/O复用模型会允许用户进程同时处理多个套接字,信号驱动I/O模型会通过信号通知用户进程数据传输完成,而异步I/O模型则通过回调函数的方式通知用户进程数据传输完成。

因此,Linux的I/O模型是对用户进程从套接字读取和写入数据的步骤进行优化,以提高应用程序的性能和可伸缩性。

阻塞I/O

阻塞I/O(Blocking I/O)是一种I/O模型,它会阻塞用户进程直到数据传输完成。在阻塞I/O模型中,在阻塞式IO模型中,如果内核缓冲区中没有数据可读,那么read函数会一直阻塞,直到有数据可读为止。在这段时间里,应用进程无法执行其他操作。(在调用accept的过程会发生阻塞直至有客户端和服务器建立连接)

在阻塞I/O模型中,用户进程会通过系统调用进入内核,系统调用会一直阻塞用户进程,直到数据传输完成。在这个过程中,用户进程会一直占用CPU时间片,无法处理其他任务,因此阻塞I/O模型通常会导致应用程序的性能和可伸缩性受到限制。

下面是阻塞I/O模型的一些特点:

阻塞I/O模型是一种简单的I/O模型,易于实现和使用。当用户进程执行I/O操作时,如果数据没有准备好或无法立即发送,那么用户进程会被阻塞直到数据准备好或发送完成。阻塞I/O模型会导致应用程序的性能和可伸缩性受到限制,因为在数据传输的过程中,用户进程会一直占用CPU时间片,无法处理其他任务。阻塞I/O模型通常适用于单线程、同步、串行的应用程序,比如文件传输、打印机等。

虽然阻塞I/O模型在某些情况下非常实用,但是在高并发、大规模应用中,阻塞I/O模型的性能和可伸缩性会变得非常糟糕,因此需要采用其他的I/O模型,比如非阻塞I/O、I/O复用、信号驱动I/O和异步I/O等。

非阻塞I/O模型

非阻塞IO是指在进行输入输出操作时,程序不会阻塞等待结果返回,而是可以继续执行其他操作,等到结果返回时再去获取结果。非阻塞IO的accept和read都不需要阻塞。这意味着当你调用这些函数时,它们会立即返回一个结果。这种方式通常与异步IO配合使用,以提高程序的性能和响应速度。

在传统的阻塞IO中,程序会一直等待IO操作完成,直到结果返回后才会继续执行。这种方式会导致程序长时间处于等待状态,浪费CPU资源。而非阻塞IO可以在IO操作的同时,执行其他操作,从而提高程序的效率和吞吐量。

实现非阻塞IO的方式一般是通过轮询或事件驱动的方式。轮询是指程序会不断地询问IO操作是否完成,如果没有完成就会继续执行其他操作,直到IO操作完成为止。事件驱动则是通过注册事件回调函数,当IO操作完成时自动调用该回调函数,从而实现非阻塞IO操作。

在使用非阻塞IO时,需要注意一些问题。首先,由于程序并不会等待IO操作完成,因此需要采用一些方式来判断IO操作是否完成,例如轮询或事件回调。其次,在使用非阻塞IO时,需要考虑线程安全和竞争条件等问题,以保证程序的正确性和稳定性。

accept

非阻塞I/O的accept函数不会阻塞,因为它是在非阻塞模式下工作的。当没有新的连接请求到达时,accept函数会立即返回-1,并且设置errno为EAGAIN或EWOULDBLOCK。这表示当前没有新的连接请求需要处理。

在使用非阻塞I/O的情况下,通常需要使用select、poll或epoll等I/O多路复用机制来监听多个套接字的状态变化,包括是否有新的连接请求到来。当有新的连接请求时,可以使用accept函数接受连接,并将新的客户端套接字设置为非阻塞模式。

需要注意的是,使用非阻塞I/O需要谨慎处理返回值和错误码,避免出现死循环或错误的处理逻辑。通常需要使用状态机等技术来处理I/O事件。

read

在非阻塞I/O模式下,read函数会立即返回,而不会等待数据到达。如果没有数据可用,read函数将返回-1,并设置errno为EAGAIN或EWOULDBLOCK,表示当前没有数据可用。

I/O多路复用

I/O多路复用是一种通用的高效I/O处理机制,它允许一个进程可以同时监视多个文件描述符(套接字),并且可以在其中任何一个文件描述符上等待数据可读或可写,从而实现并发I/O操作。

I/O多路复用机制通常由select、poll和epoll等系统调用实现。这些系统调用允许用户进程注册一组文件描述符,并且在这些文件描述符上等待某些事件的发生,例如是否有数据可读或可写。当有数据可读或可写时,进程可以进行相应的操作,例如读取数据或将数据写入文件。

在网络编程中,I/O多路复用通常用于同时监听多个套接字,以实现高效的并发网络通信。通过使用I/O多路复用机制,服务器进程可以同时处理多个客户端连接请求,并且可以避免使用阻塞I/O导致的性能瓶颈和资源浪费问题。相比之下,非阻塞I/O需要对每个套接字分别进行操作,而I/O多路复用可以在一个线程中处理多个套接字,从而减少系统调用的次数和CPU使用率。

I/O多路复用是一种高效的I/O处理机制,可以用于实现并发网络通信和其他需要同时监视多个文件描述符的应用场景。

select

select函数的原型是

int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);

它接收五个参数,分别是最大的fd值加1、可读fd集合、可写fd集合、异常fd集合和超时时间。

select函数在用户空间和内核空间之间有两次数据拷贝,一次是将用户空间的fd集合拷贝到内核空间,另一次是将就绪的fd集合拷贝回用户空间。这些拷贝操作会消耗一定的时间和内存。

select函数在内核空间中调用了do_select函数,该函数遍历所有的fd,并调用每个fd对应的设备驱动程序中的poll方法,检查设备是否就绪。如果有设备就绪,就将其加入到就绪链表中,并唤醒等待在select上的进程。如果没有设备就绪,就将进程挂起到等待队列中,并设置超时定时器。

do_select函数返回后,select函数会检查返回值和信号状态,如果有信号发生或者超时发生,就返回错误码或者0。如果有设备就绪,就将就绪链表中的设备复制到用户空间的fd集合中,并返回就绪设备的数量。

poll_wqueues

poll_wqueues结构体用于维护一组等待队列(wait queue)和相关的信息,以在内核实现的select/poll/epoll等函数中实现等待多个文件描述符上的事件。当某个进程需要对socket进行读写的时候,如果发现此socket并不能读写, 那么就可以添加到此socket的等待队列中进行休眠,当此socket可以读写时再唤醒队列中的进程。具体来说,该结构体的各个成员的作用如下:

poll_table pt:一个包含若干等待队列的poll_table结构体,用于将所有的等待队列组织起来,方便select/poll/epoll等函数中进行操作。struct poll_table_page *table:一个指向分配的等待队列页的指针。如果在等待多个文件描述符上的事件时需要分配更多的等待队列空间,则会在此指针所指向的页面上分配。struct task_struct *polling_task:一个指向当前正在等待的进程的指针。在进行多路IO复用时,可能需要让一个进程等待多个文件描述符的事件。此时,该指针用于记录当前正在等待的进程,以便进行进程调度等操作。int triggered:一个标志位,表示是否有任何一个文件描述符上发生了事件。在等待文件描述符上的事件时,如果有任何一个文件描述符上发生了事件,则会将该标志位设置为1。int error:一个标志位,表示是否在等待过程中发生了错误。如果在等待文件描述符上的事件时出现了错误,则会将该标志位设置为一个非零值,以表示出现了错误。int inline_index:用于记录当前poll_table中内联的poll_table_entry的个数。struct poll_table_entry inline_entries[N_INLINE_POLL_ENTRIES]:一个包含N_INLINE_POLL_ENTRIES个poll_table_entry结构体的数组,用于存放内联的等待队列。如果待等待的文件描述符个数较少,则可以将其放入内联数组中,以减少内存开销。struct poll_wqueues { poll_table pt; struct poll_table_page *table; struct task_struct *polling_task; int triggered; int error; int inline_index; struct poll_table_entry inline_entries[N_INLINE_POLL_ENTRIES]; }; poll_table

poll_table 结构体用于注册和传递等待条件,通常与 poll_wait() 函数一起使用。

在 Linux 内核中,当驱动程序或其他内核代码需要等待某个事件发生时,可以使用 poll_table 结构体来指定等待的事件类型。在 poll_wait() 函数调用期间,进程被标记为等待状态,并且将传递的 poll_table 结构体存储在等待队列中。当事件发生时,内核将通知等待队列中的所有进程,并让它们恢复运行。

poll_table 结构体包含两个字段:

_qproc 字段是一个函数指针,指向一个回调函数。该函数将在 poll_wait() 被调用时使用,并在事件发生时执行。回调函数的参数是指向 poll_table_entry 结构体的指针和等待条件(如 EPOLLIN 或 EPOLLOUT)。_key 字段指定了等待条件,它的值应该是 EPOLLIN、EPOLLOUT、EPOLLERR 或 EPOLLHUP 之一。如果需要等待多个条件,则应该将这些条件按位或运算组合在一起。typedef struct poll_table_struct { poll_queue_proc _qproc; __poll_t _key; } poll_table_page

poll_table_page结构体的作用是作为多路复用机制中的等待队列,用于存储需要等待的文件描述符以及它们感兴趣的事件类型。它包含以下成员:

next:指向下一个poll_table_page结构体的指针,用于形成链表。entry:指向一个poll_table_entry结构体的指针,表示该页的第一个entry,用于快速遍历所有的entry。entries:poll_table_entry结构体的数组,用于存储多个等待的文件描述符及其关注的事件类型。

当一个进程调用poll函数时,它需要传递一个poll_table结构体作为参数,并在该poll_table中指定关心的事件类型和等待的文件描述符。poll函数内部会创建一个poll_table_page结构体,并将其中的poll_table_entry结构体用于表示一个文件描述符以及它感兴趣的事件类型。在多个进程都需要等待同一个文件描述符的情况下,这些poll_table_page结构体会以链表的形式组成一个等待队列,以便内核能够更快速地遍历所有等待该文件描述符的进程,从而快速唤醒它们。

struct poll_table_page { struct poll_table_page * next; struct poll_table_entry * entry; struct poll_table_entry entries[]; }; poll_table_entry

结构体定义了一个等待队列的入口,用于将进程添加到相应的等待队列中。具体来说:

filp:指向被监视的文件对象,表示当前等待队列是为哪个文件对象的IO事件而创建的。key:表示关心的事件类型,与poll_table中的key作用相同。wait:等待队列的链表节点,用于在进程等待队列中进行连接。wait_address:等待队列头指针的指针,用于在内核中寻找该等待队列头的地址。

poll_table_entry结构体通常嵌套在poll_table_page结构体中,用于表示一个等待队列中的一个节点。当进程调用poll函数时,内核将使用该结构体表示进程需要等待的事件,并将其添加到相应的等待队列中。

struct poll_table_entry { struct file *filp; __poll_t key; wait_queue_entry_t wait; wait_queue_head_t *wait_address; }; do_select

该函数是Linux内核中实现select系统调用的主要函数之一。它的作用是等待一组文件描述符中的某些就绪,并返回就绪的文件描述符集合。

该函数的主要参数包括:

n:文件描述符集合中最大的文件描述符编号加1;fds:要等待的文件描述符集合,包括三个成员:in、out、ex,分别表示要等待的文件描述符集合的输入、输出和异常事件;end_time:等待的超时时间。

该函数的主要流程如下:

调用max_select_fd函数,获取要等待的文件描述符集合中的最大文件描述符编号加1。初始化poll_wqueues结构体和poll_table结构体,将poll_table结构体与poll_wqueues结构体关联起来,并设置超时时间。使用循环依次处理文件描述符集合中的每个文件描述符。首先判断该文件描述符是否需要等待事件。如果不需要,就继续处理下一个文件描述符;否则,使用vfs_poll函数等待事件,并根据等待结果设置相应的标志位,最终将结果存储到res_in、res_out、res_ex等变量中,并将相应的标志位置为1。在每次处理完一个文件描述符后,判断是否有事件已经就绪,如果有,则将相应的标志位置为1,并增加返回值retval的值。如果所有文件描述符都处理完毕,但没有任何事件就绪,则判断是否超时或者是否有信号等待处理。如果超时或有信号,则退出循环;否则,等待下一次事件发生。最后,释放poll_wqueues结构体和poll_table结构体,返回函数执行结果。

在等待事件的过程中,该函数使用了一些优化技巧,包括:

采用循环处理每个文件描述符,避免使用逐个遍历的方式;根据需要等待的事件类型,只等待相应的事件,并不需要等待所有事件;在等待事件时,采用了一些优化技巧,包括忙等待和时间片轮转等,以提高等待事件的效率;在等待事件时,使用了类似于epoll的机制,将等待事件的文件描述符挂到一个等待队列中,当事件就绪时,唤醒等待队列中的任务。static int do_select(int n, fd_set_bits *fds, struct timespec64 *end_time) { ktime_t expire, *to = NULL; struct poll_wqueues table; poll_table *wait; int retval, i, timed_out = 0; u64 slack = 0; __poll_t busy_flag = net_busy_loop_on() ? POLL_BUSY_LOOP : 0; unsigned long busy_start = 0; rcu_read_lock(); retval = max_select_fd(n, fds); rcu_read_unlock(); if (retval < 0) return retval; n = retval; poll_initwait(&table); wait = &table.pt; //--------上面的代码初始化等待队列------------- if (end_time && !end_time->tv_sec && !end_time->tv_nsec) { wait->_qproc = NULL; timed_out = 1; } if (end_time && !timed_out) slack = select_estimate_accuracy(end_time); retval = 0; //--------检查是否超时,超时则设置一个合理的延迟时间------ for (;;) { unsigned long *rinp, *routp, *rexp, *inp, *outp, *exp; bool can_busy_loop = false; inp = fds->in; outp = fds->out; exp = fds->ex; rinp = fds->res_in; routp = fds->res_out; rexp = fds->res_ex; //--------------这里定义了多个指针,分别指向文件描述符集合中可读、可写和异常描述符的位掩码数组,以及指向每个位掩码对应的结果数组 for (i = 0; i < n; ++rinp, ++routp, ++rexp) { unsigned long in, out, ex, all_bits, bit = 1, j; unsigned long res_in = 0, res_out = 0, res_ex = 0; __poll_t mask; in = *inp++; out = *outp++; ex = *exp++; all_bits = in | out | ex; if (all_bits == 0) { i += BITS_PER_LONG;//64位的操作系统BITS_PER_LONG的值是64。这个语句的作用是跳过一组文件描述符 continue; } //------首先获取当前文件描述符的可读、可写和异常状态位掩码(in、out 和 ex)。然后计算它们的按位或,将结果保存到 all_bits 中。如果 all_bits 的值为 0,则说明当前文件描述符没有任何状态位被设置,直接跳过本次循环 //j是64,遍历64次找到对应的位进行操作, for (j = 0; j < BITS_PER_LONG; ++j, ++i, bit _qproc = NULL; } if ((mask & POLLOUT_SET) && (out & bit)) { res_out |= bit; retval++; wait->_qproc = NULL; } if ((mask & POLLEX_SET) && (ex & bit)) { res_ex |= bit; retval++; wait->_qproc = NULL; } /* got something, stop busy polling */ if (retval) { can_busy_loop = false; busy_flag = 0; /* * only remember a returned * POLL_BUSY_LOOP if we asked for it */ } else if (busy_flag & mask) can_busy_loop = true; } if (res_in) *rinp = res_in; if (res_out) *routp = res_out; if (res_ex) *rexp = res_ex; cond_resched(); } wait->_qproc = NULL; if (retval || timed_out || signal_pending(current)) break; if (table.error) { retval = table.error; break; } /* only if found POLL_BUSY_LOOP sockets && not out of time */ if (can_busy_loop && !need_resched()) { if (!busy_start) { busy_start = busy_loop_current_time(); continue; } if (!busy_loop_timeout(busy_start)) continue; } busy_flag = 0; /* * If this is the first loop and we have a timeout * given, then we convert to ktime_t and set the to * pointer to the expiry value. */ if (end_time && !to) { expire = timespec64_to_ktime(*end_time); to = &expire; } if (!poll_schedule_timeout(&table, TASK_INTERRUPTIBLE, to, slack)) timed_out = 1; } poll_freewait(&table); return retval; } vfs_poll

这个函数的作用是查询一个文件描述符的读写状态,并返回一个表示可读或可写的 mask 值。如果文件描述符没有 poll 函数,就返回默认的 mask 值。这个函数还可以在 poll 表中注册一个回调函数,用来在文件描述符可读或可写时唤醒相关进程。

static inline __poll_t vfs_poll(struct file *file, struct poll_table_struct *pt) { if (unlikely(!file->f_op->poll)) return DEFAULT_POLLMASK; return file->f_op->poll(file, pt); } socket_poll

在上面的poll实际上调用的是socket_poll,该函数的作用是处理对一个套接字文件描述符的轮询操作(polling)请求。

具体来说,该函数的输入参数为一个 file 结构体指针和一个 poll_table 结构体指针,其中 file 结构体指向一个套接字文件描述符,poll_table 结构体用于等待事件的发生。该函数的返回值是一个 __poll_t 类型的值,表示哪些事件已经发生。

函数内部的第一行代码 struct socket *sock = file->private_data; 将套接字文件描述符所对应的 private_data 指针转换为一个 struct socket 类型的指针,以便后续对套接字的操作。

接下来的一行代码 __poll_t events = poll_requested_events(wait), flag = 0; 获取了调用 poll 系统调用时请求的事件集合,并初始化一个标志变量 flag。

接着的代码 if (!sock->ops->poll) return 0; 检查套接字的操作结构体中是否存在 poll 函数指针,如果不存在则直接返回 0。

然后代码 if (sk_can_busy_loop(sock->sk)) { 检查套接字是否支持繁忙轮询(busy polling),如果支持则进行下一步操作。

在此之后的代码 if (events & POLL_BUSY_LOOP) sk_busy_loop(sock->sk, 1); 判断系统调用是否请求进行繁忙轮询,如果是,则调用 sk_busy_loop 函数执行一次繁忙轮询操作。

然后代码 flag = POLL_BUSY_LOOP; 将标志变量 flag 设置为表示繁忙轮询的标志。

最后的代码 return sock->ops->poll(file, sock, wait) | flag; 调用套接字操作结构体中的 poll 函数,并将其返回值与标志变量 flag 的值进行按位或操作,并将结果作为函数的返回值。

因此,该函数的作用是:处理套接字文件描述符的轮询操作请求,包括获取请求的事件集合、执行繁忙轮询操作、调用套接字操作结构体中的 poll 函数,并返回哪些事件已经发生。

static __poll_t sock_poll(struct file *file, poll_table *wait) { struct socket *sock = file->private_data; __poll_t events = poll_requested_events(wait), flag = 0; if (!sock->ops->poll) return 0; if (sk_can_busy_loop(sock->sk)) { /* poll once if requested by the syscall */ if (events & POLL_BUSY_LOOP) sk_busy_loop(sock->sk, 1);//阻塞IO时使用 /* if this socket can poll_ll, tell the system call */ flag = POLL_BUSY_LOOP; } return sock->ops->poll(file, sock, wait) | flag; } tcp_poll

假设这个链接是一个IP4的TCP连接,那上面的poll函数实际上调用的是tcp_poll。

这是一个用于TCP套接字的poll函数的实现。poll函数是用于查询一个或多个文件描述符的状态变化的系统调用,该函数可以在文件描述符上等待一个或多个事件的发生,并且可以选择阻塞或非阻塞等待模式。

该函数接收三个参数:一个指向表示套接字的文件结构体的指针,一个指向套接字的结构体的指针,以及一个指向poll_table结构体的指针,后者包含了一组需要等待的事件掩码。

该函数的主要工作如下:

等待事件的发生。该函数通过调用sock_poll_wait函数来等待套接字的事件,如可读、可写等等。判断套接字状态。如果套接字处于TCP_LISTEN状态,表示正在监听连接请求,因此调用inet_csk_listen_poll函数来查询是否有新的连接请求。判断套接字是否关闭。如果套接字已经关闭或者已经进入TCP_CLOSE状态,则设置EPOLLHUP标志,表示该套接字已经失效。判断套接字是否可读。如果套接字的接收缓冲区中有可读数据,则设置EPOLLIN标志,表示该套接字可以读取数据。同时,如果接收缓冲区中的紧急数据存在,则设置EPOLLPRI标志。判断套接字是否可写。如果套接字可以写入数据,则设置EPOLLOUT标志,表示该套接字可以写入数据。如果套接字不可写,则调用sk_set_bit函数设置SOCKWQ_ASYNC_NOSPACE位,表示套接字写队列已满,需要等待可写事件的发生。判断套接字是否存在错误。如果套接字出现错误,则设置EPOLLERR标志,表示该套接字已经出错。返回事件掩码。将所有设置的事件标志按位或运算后返回,表示该套接字当前可以进行的操作。__poll_t tcp_poll(struct file *file, struct socket *sock, poll_table *wait) { __poll_t mask; struct sock *sk = sock->sk; const struct tcp_sock *tp = tcp_sk(sk); int state; sock_poll_wait(file, sock, wait); state = inet_sk_state_load(sk); if (state == TCP_LISTEN) return inet_csk_listen_poll(sk); mask = 0; if (sk->sk_shutdown == SHUTDOWN_MASK || state == TCP_CLOSE) mask |= EPOLLHUP; if (sk->sk_shutdown & RCV_SHUTDOWN) mask |= EPOLLIN | EPOLLRDNORM | EPOLLRDHUP; /* Connected or passive Fast Open socket? */ if (state != TCP_SYN_SENT && (state != TCP_SYN_RECV || rcu_access_pointer(tp->fastopen_rsk))) { int target = sock_rcvlowat(sk, 0, INT_MAX); u16 urg_data = READ_ONCE(tp->urg_data); if (unlikely(urg_data) && READ_ONCE(tp->urg_seq) == READ_ONCE(tp->copied_seq) && !sock_flag(sk, SOCK_URGINLINE)) target++; if (tcp_stream_is_readable(sk, target)) mask |= EPOLLIN | EPOLLRDNORM; if (!(sk->sk_shutdown & SEND_SHUTDOWN)) { if (__sk_stream_is_writeable(sk, 1)) { mask |= EPOLLOUT | EPOLLWRNORM; } else { /* send SIGIO later */ sk_set_bit(SOCKWQ_ASYNC_NOSPACE, sk); set_bit(SOCK_NOSPACE, &sk->sk_socket->flags); smp_mb__after_atomic(); if (__sk_stream_is_writeable(sk, 1)) mask |= EPOLLOUT | EPOLLWRNORM; } } else mask |= EPOLLOUT | EPOLLWRNORM; if (urg_data & TCP_URG_VALID) mask |= EPOLLPRI; } else if (state == TCP_SYN_SENT && inet_sk(sk)->defer_connect) { mask |= EPOLLOUT | EPOLLWRNORM; } smp_rmb(); if (sk->sk_err || !skb_queue_empty_lockless(&sk->sk_error_queue)) mask |= EPOLLERR; return mask; } sock_poll_wait检查poll_table中是否需要等待事件,如果不需要,则直接返回。如果需要等待事件,则调用poll_wait()函数等待套接字的可用性事件,同时将poll_table的等待队列添加到套接字的等待队列中。在添加等待队列之后,需要执行一个内存屏障来同步套接字标志的修改。

总之,该函数的作用是将poll_table中的等待队列添加到套接字的等待队列中,并等待套接字的可用性事件。

static inline void sock_poll_wait(struct file *filp, struct socket *sock, poll_table *p) { if (!poll_does_not_wait(p)) { poll_wait(filp, &sock->wq.wait, p); /* We need to be sure we are in sync with the * socket flags modification. * * This memory barrier is paired in the wq_has_sleeper. */ smp_mb(); } } poll_wait

poll_wait函数是用在poll系统调用中的,它的作用是将当前进程添加到指定的等待队列中,以便在设备状态发生变化时能够被唤醒。

poll_wait函数有三个参数:filp、wait_address和p。filp是一个文件结构体指针,表示要监视的设备;wait_address是一个等待队列头指针,表示要加入的等待队列;p是一个poll_table结构体指针,表示存放文件描述符集合的表。

poll_wait函数的工作流程如下:

检查p和p->_qproc是否为空,如果为空则直接返回;检查wait_address是否为空,如果为空则直接返回;调用p->_qproc函数,将filp和wait_address作为参数传入;p->_qproc函数会将filp添加到wait_address所指向的等待队列中,并设置相应的回调函数;当设备状态发生变化时(比如可读或可写),回调函数会被执行,并唤醒等待队列中的进程。

所以,poll_wait函数完成了将当前进程注册到设备的等待队列中,并使其能够在设备状态变化时被唤醒的工作。

static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p) { if (p && p->_qproc && wait_address) p->_qproc(filp, wait_address, p); } poll_freewait

poll_freewait函数是用在poll系统调用结束后的,它的作用是清理之前创建的poll_table结构体,并将等待队列中的元素从相应的等待队列头中移除。

poll_freewait函数有一个参数:table。table是一个poll_wqueues结构体指针,表示存放文件描述符集合和等待队列元素的表。

poll_freewait函数的工作流程如下:

遍历table->poll_list链表,对每个元素执行以下操作:获取该元素所属的文件描述符和等待队列头;将该元素从等待队列头中删除;如果该文件描述符有就绪事件,则将其保存在table->data中;释放该元素占用的内存空间;返回table->data中保存的就绪事件数量。

poll_freewait函数完成了清理内存资源和获取就绪事件结果的工作。

poll

多路复用的poll实际上调用的是do_poll。其实poll和select的原理差不多,就不进行解释了。

do_poll 函数实现了 poll() 系统调用的核心功能,它轮询指定的文件描述符并等待其上的事件,直到其中至少一个描述符上发生了事件或者等待时间截止。具体来说,该函数执行以下步骤:

为等待队列分配一个 poll_table 结构体,并初始化其 _qproc 成员为 NULL,以便在将来将它们添加到等待队列时使用。如果指定了超时时间,则计算时间戳的精度,并将 slack 设置为该精度,以便更准确地计算超时。在一个无限循环中,遍历指定的文件描述符列表,对于每个文件描述符,调用 do_pollfd 函数,将其添加到等待队列中,并尝试检测它是否已准备好进行 I/O 操作。如果检测到 I/O 事件,则增加 count 的计数器,并将 pt->_qproc 成员设置为 NULL。当所有文件描述符都被检查过一次后,检查 count 的计数器。如果计数器大于零,则至少有一个文件描述符上发生了事件,函数将退出。否则,函数将根据等待队列的状态决定是等待更多事件还是退出等待。如果发现 POLL_BUSY_LOOP 描述符(即可能会忙等待),并且系统没有到达调度时限,则使用忙等待的方式等待事件发生。在此期间,函数会定期检查当前时间是否已经超过了最大忙等待时间,如果已经超时,则退出忙等待。如果有超时时间且 to 为 NULL,则将其设置为当前超时时间,并转换为 ktime_t 格式。然后使用 poll_schedule_timeout 函数等待事件发生,如果超时则将 timed_out 标志设置为 1 并退出等待。如果有事件发生,则 count 的计数器将增加,并退出等待。

最终,do_poll 函数返回等待事件的文件描述符数量。

static int do_poll(struct poll_list *list, struct poll_wqueues *wait, struct timespec64 *end_time) { poll_table* pt = &wait->pt; ktime_t expire, *to = NULL; int timed_out = 0, count = 0; u64 slack = 0; __poll_t busy_flag = net_busy_loop_on() ? POLL_BUSY_LOOP : 0; unsigned long busy_start = 0; /* Optimise the no-wait case */ if (end_time && !end_time->tv_sec && !end_time->tv_nsec) { pt->_qproc = NULL; timed_out = 1; } if (end_time && !timed_out) slack = select_estimate_accuracy(end_time); for (;;) { struct poll_list *walk; bool can_busy_loop = false; for (walk = list; walk != NULL; walk = walk->next) { struct pollfd * pfd, * pfd_end; pfd = walk->entries; pfd_end = pfd + walk->len; for (; pfd != pfd_end; pfd++) { /* * Fish for events. If we found one, record it * and kill poll_table->_qproc, so we don't * needlessly register any other waiters after * this. They'll get immediately deregistered * when we break out and return. */ if (do_pollfd(pfd, pt, &can_busy_loop, busy_flag)) { count++; pt->_qproc = NULL; /* found something, stop busy polling */ busy_flag = 0; can_busy_loop = false; } } } /* * All waiters have already been registered, so don't provide * a poll_table->_qproc to them on the next loop iteration. */ pt->_qproc = NULL; if (!count) { count = wait->error; if (signal_pending(current)) count = -ERESTARTNOHAND; } if (count || timed_out) break; /* only if found POLL_BUSY_LOOP sockets && not out of time */ if (can_busy_loop && !need_resched()) { if (!busy_start) { busy_start = busy_loop_current_time(); continue; } if (!busy_loop_timeout(busy_start)) continue; } busy_flag = 0; /* * If this is the first loop and we have a timeout * given, then we convert to ktime_t and set the to * pointer to the expiry value. */ if (end_time && !to) { expire = timespec64_to_ktime(*end_time); to = &expire; } if (!poll_schedule_timeout(wait, TASK_INTERRUPTIBLE, to, slack)) timed_out = 1; } return count; } epoll

相比于select,epoll最大的好处在于它不会随着监听fd数目的增长而降低效率。因为在内核中的select实现中,它是采用轮询来处理的,轮询的fd数目越多,自然耗时越多。

epoll 和 select 最大的不同是 epoll 是基于事件驱动的,而 select 是基于轮询的。

在 select 中,每次调用 select 函数时,内核都需要轮询所有被 select 监听的文件描述符,来检查是否有事件发生。而在 epoll 中,首先需要通过 epoll_ctl 将文件描述符加入到 epoll 中,然后调用 epoll_wait 等待事件的发生。当有事件发生时,内核会通知应用程序,应用程序只需要处理这些发生事件的文件描述符即可。

此外,epoll 与 select 在支持的文件描述符数量上也有区别。select 支持的文件描述符数量通常受到系统内存限制,而 epoll 可以处理非常大的文件描述符集合,这是因为 epoll 内部使用的是红黑树来保存文件描述符,而非数组。

另外,当文件描述符集合变化时,select 需要重新调用 select 函数来更新内核维护的文件描述符集合,而 epoll 只需要调用 epoll_ctl 来修改文件描述符集合即可。

综上所述,epoll 相对于 select,具有更高的效率和更好的扩展性,特别是当需要处理大量文件描述符时,epoll 的优势更加明显。

eventpoll

表示一个事件轮询(eventpoll)对象。该结构体中的各个成员变量用于描述eventpoll对象的各个方面,包括:

互斥锁(mtx):用于保护eventpoll对象中的文件集合,在事件收集循环、文件清理路径、epoll文件退出代码和ctl操作期间防止文件被删除。等待队列(wq):用于由sys_epoll_wait()函数等待eventpoll对象上的事件。另一个等待队列(poll_wait):用于在文件的poll()操作中等待eventpoll对象上的事件。双向链表(rdllist):用于存储已准备好的文件描述符。读写锁(lock):用于保护rdllist和ovflist。红黑树(rbr):用于存储被监视的文件描述符。单链表(ovflist):用于链式存储准备好的事件,在将事件传递给用户空间时使用。wakeup_source(ws):在ep_scan_ready_list运行时使用的wakeup_source。user_struct(user):创建eventpoll描述符的用户。file:指向eventpoll文件的指针。gen:用于优化循环检测检查。hlist_head(refs):用于存储refcount对象的哈希表头。napi_id(可选):用于跟踪忙碌轮询napi_id。nests(可选):用于跟踪唤醒嵌套,以进行锁定依赖性验证。struct eventpoll { /* * This mutex is used to ensure that files are not removed * while epoll is using them. This is held during the event * collection loop, the file cleanup path, the epoll file exit * code and the ctl operations. */ struct mutex mtx; /* Wait queue used by sys_epoll_wait() */ wait_queue_head_t wq; /* Wait queue used by file->poll() */ wait_queue_head_t poll_wait; /* List of ready file descriptors */ struct list_head rdllist; /* Lock which protects rdllist and ovflist */ rwlock_t lock; /* RB tree root used to store monitored fd structs */ struct rb_root_cached rbr; /* * This is a single linked list that chains all the "struct epitem" that * happened while transferring ready events to userspace w/out * holding ->lock. */ struct epitem *ovflist; /* wakeup_source used when ep_scan_ready_list is running */ struct wakeup_source *ws; /* The user that created the eventpoll descriptor */ struct user_struct *user; struct file *file; /* used to optimize loop detection check */ u64 gen; struct hlist_head refs; #ifdef CONFIG_NET_RX_BUSY_POLL /* used to track busy poll napi_id */ unsigned int napi_id; #endif #ifdef CONFIG_DEBUG_LOCK_ALLOC /* tracks wakeup nests for lockdep validation */ u8 nests; #endif }; epitem

struct epitem 是一个结构体,用于存储 epoll 监测的文件描述符和事件信息。它有以下成员:

rbn:一个红黑树节点,用于将这个结构体链接到 eventpoll 的红黑树中。

rcu:一个 RCU 头,用于释放这个结构体。

rdllink:一个链表头,用于将这个结构体链接到 eventpoll 的就绪列表中。

next:一个指针,用于将这个结构体链接到 eventpoll 的溢出列表中。

ffd:一个文件描述符信息结构体,包含了文件描述符和文件指针。

pwqlist:一个链表头,用于存储等待队列的入口。

ep:一个指向 eventpoll 对象的指针。

fllink:一个哈希表节点,用于将这个结构体链接到文件的项列表中。

ws:一个唤醒源指针,用于在 EPOLLWAKEUP 标志被设置时保持系统唤醒。

event:一个 epoll_event 结构体,描述了感兴趣的事件和源文件描述符。

struct epitem 的作用是在 epoll 中管理和传递文件描述符和事件的相关数据。

struct epitem { union { /* RB tree node links this structure to the eventpoll RB tree */ struct rb_node rbn; /* Used to free the struct epitem */ struct rcu_head rcu; }; /* List header used to link this structure to the eventpoll ready list */ struct list_head rdllink; /* * Works together "struct eventpoll"->ovflist in keeping the * single linked chain of items. */ struct epitem *next; /* The file descriptor information this item refers to */ struct epoll_filefd ffd; /* List containing poll wait queues */ struct eppoll_entry *pwqlist; /* The "container" of this item */ struct eventpoll *ep; /* List header used to link this item to the "struct file" items list */ struct hlist_node fllink; /* wakeup_source used when EPOLLWAKEUP is set */ struct wakeup_source __rcu *ws; /* The structure that describe the interested events and the source fd */ struct epoll_event event; }; epoll_event

struct epoll_event 是用来描述一个文件描述符上注册的事件的结构体,它包含两个成员变量:

events:用来表示该文件描述符上注册的事件类型,类型为 __poll_t,实际上是一个整数类型,该类型定义在 头文件中,可以表示多个事件类型,如 POLLIN、POLLOUT、POLLERR 等。data:用来存储与该文件描述符相关的数据,类型为 __u64,实际上是一个 64 位的无符号整数类型,可以存储一个指针或一个整数。

在 epoll 中,可以为一个文件描述符注册多个事件类型,例如一个 socket 可以注册 EPOLLIN 和 EPOLLOUT 事件,因此 struct epoll_event 可以表示多个事件类型。

struct epoll_event 是 epoll API 中用于描述事件的结构体,它通过 events 成员来表示事件类型,通过 data 成员来存储与事件相关的数据。

struct epoll_event { __poll_t events; __u64 data; } epoll_create

在使用epolll前需要调用epoll_create函数进行创建。epoll_create函数是Linux内核中创建epoll对象的系统调用epoll_create的实现。函数的主要作用是创建一个新的eventpoll对象和相应的文件描述符,以便用户进程可以使用它来监视多个文件描述符上的事件。

具体来说,函数执行以下步骤:

检查flags参数是否合法。如果flags参数中包含除EPOLL_CLOEXEC以外的其他标志,则返回EINVAL错误。调用ep_alloc()函数创建一个新的eventpoll对象。如果失败,则返回相应的错误码。调用get_unused_fd_flags()函数获取一个空闲的文件描述符。如果失败,则返回相应的错误码。调用anon_inode_getfile()函数创建一个匿名的inode,并将eventpoll对象绑定到该inode上,同时创建一个文件对象。如果失败,则返回相应的错误码。调用fd_install()函数将文件对象安装到指定的文件描述符上。返回新创建的文件描述符。如果在任一步骤中发生错误,则调用put_unused_fd()和ep_free()函数释放之前分配的资源,并返回相应的错误码。

在Linux内核中,每个进程都有一张打开文件表,用于管理该进程打开的所有文件。每个打开的文件都有一个文件描述符(file descriptor),它是一个非负整数,用于唯一标识该文件。当一个进程打开一个新文件时,内核会为该文件分配一个新的文件描述符。

在epoll_create函数中,需要为新创建的eventpoll对象分配一个新的文件描述符,以便用户进程可以使用它来监视多个文件描述符上的事件。为了避免与现有的文件描述符冲突,需要使用get_unused_fd_flags()函数获取一个空闲的文件描述符,以便将新创建的eventpoll对象绑定到该文件描述符上。

一旦完成了对eventpoll对象的使用,就需要释放相应的文件描述符和eventpoll对象。为了避免文件描述符资源的浪费,需要使用put_unused_fd()函数将之前获取的文件描述符释放回内核。同样,为了避免eventpoll对象的内存泄漏,需要使用ep_free()函数释放之前分配的eventpoll对象内存。这样,就可以确保在使用完eventpoll对象后,不会浪费任何内核资源。

static int do_epoll_create(int flags) { int error, fd; struct eventpoll *ep = NULL; struct file *file; /* Check the EPOLL_* constant for consistency. */ BUILD_BUG_ON(EPOLL_CLOEXEC != O_CLOEXEC); if (flags & ~EPOLL_CLOEXEC) return -EINVAL; /* * Create the internal data structure ("struct eventpoll"). */ error = ep_alloc(&ep); if (error < 0) return error; /* * Creates all the items needed to setup an eventpoll file. That is, * a file structure and a free file descriptor. */ fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC)); if (fd < 0) { error = fd; goto out_free_ep; } file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep, O_RDWR | (flags & O_CLOEXEC)); if (IS_ERR(file)) { error = PTR_ERR(file); goto out_free_fd; } ep->file = file; fd_install(fd, file); return fd; out_free_fd: put_unused_fd(fd); out_free_ep: ep_free(ep); return error; } epoll_ctl

epoll_ctl函数是 Linux 内核中 epoll 模块的核心函数之一,用于控制 epoll 实例中的文件描述符的事件。该函数的功能如下:

首先获取指定的 epoll 文件描述符 f 和目标文件描述符 tf,检查目标文件描述符是否支持 poll 操作,如果不支持,则返回 -EPERM 错误码。检查是否允许 EPOLLWAKEUP,如果操作符包含事件,则调用函数 ep_take_care_of_epollwakeup() 处理 EPOLLWAKEUP。检查操作符是否为 EPOLL_CTL_MOD,并且不允许使用 EPOLLEXCLUSIVE 事件,如果是,则返回错误码 -EPERM。检查 f 和 tf 是否相同,或者 tf 是否是另一个 epoll 文件描述符,如果是,则返回错误码 -EINVAL。获取 f 文件描述符中保存的 eventpoll 结构体实例,锁定该实例的 mtx 互斥锁。如果操作符为 EPOLL_CTL_ADD,则检查是否存在循环依赖的情况,以及是否超过了最大允许的触发路径。如果存在循环依赖,则返回错误码 -ELOOP。调用函数 ep_find() 在 eventpoll 实例的文件红黑树中查找 tf 文件描述符对应的 epitem 结构体实例。根据操作符执行相应的操作,如果是 EPOLL_CTL_ADD,则将 tf 文件描述符添加到 eventpoll 实例中,如果是 EPOLL_CTL_DEL,则从 eventpoll 实例中删除 tf 文件描述符,如果是 EPOLL_CTL_MOD,则修改 tf 文件描述符在 eventpoll 实例中的状态。解锁 mtx 互斥锁,返回操作结果。

do_epoll_ctl() 函数的主要作用是将文件描述符添加到 eventpoll 实例中,以便 epoll_wait() 函数可以在需要时触发该文件描述符的事件。

int do_epoll_ctl(int epfd, int op, int fd, struct epoll_event *epds, bool nonblock) { int error; int full_check = 0; struct fd f, tf; struct eventpoll *ep; struct epitem *epi; struct eventpoll *tep = NULL; error = -EBADF; f = fdget(epfd); if (!f.file) goto error_return; tf = fdget(fd); if (!tf.file) goto error_fput; error = -EPERM; if (!file_can_poll(tf.file)) goto error_tgt_fput; if (ep_op_has_event(op)) ep_take_care_of_epollwakeup(epds); error = -EINVAL; if (f.file == tf.file || !is_file_epoll(f.file)) goto error_tgt_fput; if (ep_op_has_event(op) && (epds->events & EPOLLEXCLUSIVE)) { if (op == EPOLL_CTL_MOD) goto error_tgt_fput; if (op == EPOLL_CTL_ADD && (is_file_epoll(tf.file) || (epds->events & ~EPOLLEXCLUSIVE_OK_BITS))) goto error_tgt_fput; } ep = f.file->private_data; error = epoll_mutex_lock(&ep->mtx, 0, nonblock); if (error) goto error_tgt_fput; if (op == EPOLL_CTL_ADD) { if (READ_ONCE(f.file->f_ep) || ep->gen == loop_check_gen || is_file_epoll(tf.file)) { mutex_unlock(&ep->mtx); error = epoll_mutex_lock(&epmutex, 0, nonblock); if (error) goto error_tgt_fput; loop_check_gen++; full_check = 1; if (is_file_epoll(tf.file)) { tep = tf.file->private_data; error = -ELOOP; if (ep_loop_check(ep, tep) != 0) goto error_tgt_fput; } error = epoll_mutex_lock(&ep->mtx, 0, nonblock); if (error) goto error_tgt_fput; } } epi = ep_find(ep, tf.file, fd); error = -EINVAL; switch (op) { case EPOLL_CTL_ADD: if (!epi) { epds->events |= EPOLLERR | EPOLLHUP; error = ep_insert(ep, epds, tf.file, fd, full_check); } else error = -EEXIST; break; case EPOLL_CTL_DEL: if (epi) error = ep_remove(ep, epi); else error = -ENOENT; break; case EPOLL_CTL_MOD: if (epi) { if (!(epi->event.events & EPOLLEXCLUSIVE)) { epds->events |= EPOLLERR | EPOLLHUP; error = ep_modify(ep, epi, epds); } } else error = -ENOENT; break; } mutex_unlock(&ep->mtx); error_tgt_fput: if (full_check) { clear_tfile_check_list(); loop_check_gen++; mutex_unlock(&epmutex); } fdput(tf); error_fput: fdput(f); error_return: return error; } epoll_wait

该函数实现了epoll_wait系统调用的具体功能,以下是每一步完成的工作:

首先检查maxevents参数的有效性,如果maxevents小于等于0或大于EP_MAX_EVENTS,返回-EINVAL错误。然后检查传递给该函数的events指针是否是有效的用户空间指针,如果不是,返回-EFAULT错误。获取epfd参数对应的文件描述符对应的文件结构体,即"struct file *",如果该文件结构体不存在,则返回-EBADF错误。检查该文件结构体是否是eventpoll类型的文件,如果不是,返回-EINVAL错误。获取该eventpoll文件对应的私有数据结构体"struct eventpoll *",即上面定义的ep。调用ep_poll函数,获取事件。释放文件描述符,返回事件数量或者错误代码。

总体而言,该函数的功能就是获取eventpoll文件关注的事件并将其填充到用户空间传递的events数组中,最后返回获取到的事件数量。

static int do_epoll_wait(int epfd, struct epoll_event __user *events, int maxevents, struct timespec64 *to) { int error; struct fd f; struct eventpoll *ep; /* The maximum number of event must be greater than zero */ if (maxevents EP_MAX_EVENTS) return -EINVAL; /* Verify that the area passed by the user is writeable */ if (!access_ok(events, maxevents * sizeof(struct epoll_event))) return -EFAULT; /* Get the "struct file *" for the eventpoll file */ f = fdget(epfd); if (!f.file) return -EBADF; error = -EINVAL; if (!is_file_epoll(f.file)) goto error_fput; /* * At this point it is safe to assume that the "private_data" contains * our own data structure. */ ep = f.file->private_data; /* Time to fish for events ... */ error = ep_poll(ep, events, maxevents, to); error_fput: fdput(f); return error; } ep_poll

ep_poll函数是实现 epoll_wait 的核心函数之一,用于从 eventpoll 中等待和获取事件。下面是该函数的具体分析:

对超时参数进行处理,并计算出最小的估算误差(slack)以及到期时间(expires)。获取可读取事件的数量(eavail)。如果有可读取事件,尝试将事件传输到用户空间。如果返回值为非零,表示出现错误,直接返回错误值。如果没有可读取事件,且超时时间为零,返回 0。如果没有可读取事件,且超时时间不为零,则进入忙等待状态,循环检查是否有可读取事件。如果出现中断信号,则返回 EINTR。如果有可读取事件,则直接进入下一次循环。如果没有可读取事件,则创建一个等待队列条目,并加入事件多路复用对象的等待队列(ep->wq)中。设置当前任务的状态为 TASK_INTERRUPTIBLE,并且在多路复用对象的自动删除唤醒函数(ep_autoremove_wake_function)中等待。从等待队列中移除等待条目,然后解锁事件多路复用对象的自旋锁(ep->lock)。如果超时时间到期,则返回 0。设置当前任务的状态为 TASK_RUNNING。设置可读取事件的数量为 1。如果等待条目不为空,且超时时间已到,则判断等待队列中是否还有其他等待条目,如果没有,则将可读取事件的数量设置为 0。

该函数的作用是在 eventpoll 中等待和获取事件,并将事件传输到用户空间。如果没有可读取事件,则会进入等待状态,直到超时或者有事件到达为止。在等待的过程中,该函数会使用忙等待和睡眠等待相结合的方式,以提高事件响应的效率。

static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events, int maxevents, struct timespec64 *timeout) { int res, eavail, timed_out = 0; u64 slack = 0; wait_queue_entry_t wait; ktime_t expires, *to = NULL; lockdep_assert_irqs_enabled(); if (timeout && (timeout->tv_sec | timeout->tv_nsec)) { slack = select_estimate_accuracy(timeout); to = &expires; *to = timespec64_to_ktime(*timeout); } else if (timeout) { /* * Avoid the unnecessary trip to the wait queue loop, if the * caller specified a non blocking operation. */ timed_out = 1; } /* * This call is racy: We may or may not see events that are being added * to the ready list under the lock (e.g., in IRQ callbacks). For cases * with a non-zero timeout, this thread will check the ready list under * lock and will add to the wait queue. For cases with a zero * timeout, the user by definition should not care and will have to * recheck again. */ eavail = ep_events_available(ep); while (1) { if (eavail) { /* * Try to transfer events to user space. In case we get * 0 events and there's still timeout left over, we go * trying again in search of more luck. */ res = ep_send_events(ep, events, maxevents); if (res) return res; } if (timed_out) return 0; eavail = ep_busy_loop(ep, timed_out); if (eavail) continue; if (signal_pending(current)) return -EINTR; init_wait(&wait); wait.func = ep_autoremove_wake_function; write_lock_irq(&ep->lock); __set_current_state(TASK_INTERRUPTIBLE); eavail = ep_events_available(ep); if (!eavail) __add_wait_queue_exclusive(&ep->wq, &wait); write_unlock_irq(&ep->lock); if (!eavail) timed_out = !schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS); __set_current_state(TASK_RUNNING); eavail = 1; if (!list_empty_careful(&wait.entry)) { write_lock_irq(&ep->lock); if (timed_out) eavail = list_empty(&wait.entry); __remove_wait_queue(&ep->wq, &wait); write_unlock_irq(&ep->lock); } } } ep_send_events

Linux内核中实现的ep_send_events函数,主要的功能是在epoll实例中扫描已经准备好的epitem并将事件传递到用户空间,同时将相关的epitem从rdllist中删除或重新插入。

具体地说,该函数的实现步骤如下:

检查当前进程是否有致命的信号挂起,如果有则立即返回,避免死循环。初始化poll_table,这是用于实现用户空间的poll()系统调用的数据结构。获取epoll实例的互斥锁,以确保该函数不会与其他并发操作相互干扰。调用ep_start_scan函数开始扫描已准备好的epitem,将它们添加到txlist中。对于txlist中的每个epitem,按以下步骤处理:检查是否已经处理足够数量的事件,如果是则跳出循环。激活epi的唤醒源,并将其从rdllist中删除。使用ep_item_poll函数检查当前epi的事件掩码是否与调用者请求的事件掩码相交,并返回已准备好的事件类型。如果没有准备好的事件,则继续循环下一个epitem。否则,将已准备好的事件传递给用户空间,并将其存储在events数组中。如果无法写入events数组,则将epi重新插入到txlist中,标记返回值为-EFAULT,并跳出循环。如果成功将事件写入events数组,则将返回值增加1。如果epi是使用EPOLLONESHOT注册的,则清除其事件标志;如果使用的是水平触发模式而不是边缘触发模式,则将其重新插入到rdllist中。调用ep_done_scan函数完成扫描,将所有epitem从txlist中删除,并根据需要重新插入到rdllist或ovflist中。释放epoll实例的互斥锁。返回已准备好的事件数量。

ep_send_events函数是用于处理已准备好的epitem,并将它们的事件传递给用户空间的核心函数,它在epoll实例的事件通知机制中起到了至关重要的作用。

static int ep_send_events(struct eventpoll *ep, struct epoll_event __user *events, int maxevents) { struct epitem *epi, *tmp; LIST_HEAD(txlist); poll_table pt; int res = 0; /* * Always short-circuit for fatal signals to allow threads to make a * timely exit without the chance of finding more events available and * fetching repeatedly. */ if (fatal_signal_pending(current)) return -EINTR; init_poll_funcptr(&pt, NULL); mutex_lock(&ep->mtx); ep_start_scan(ep, &txlist); /* * We can loop without lock because we are passed a task private list. * Items cannot vanish during the loop we are holding ep->mtx. */ list_for_each_entry_safe(epi, tmp, &txlist, rdllink) { struct wakeup_source *ws; __poll_t revents; if (res >= maxevents) break; /* * Activate ep->ws before deactivating epi->ws to prevent * triggering auto-suspend here (in case we reactive epi->ws * below). * * This could be rearranged to delay the deactivation of epi->ws * instead, but then epi->ws would temporarily be out of sync * with ep_is_linked(). */ ws = ep_wakeup_source(epi); if (ws) { if (ws->active) __pm_stay_awake(ep->ws); __pm_relax(ws); } list_del_init(&epi->rdllink); /* * If the event mask intersect the caller-requested one, * deliver the event to userspace. Again, we are holding ep->mtx, * so no operations coming from userspace can change the item. */ revents = ep_item_poll(epi, &pt, 1); if (!revents) continue; events = epoll_put_uevent(revents, epi->event.data, events); if (!events) { list_add(&epi->rdllink, &txlist); ep_pm_stay_awake(epi); if (!res) res = -EFAULT; break; } res++; if (epi->event.events & EPOLLONESHOT) epi->event.events &= EP_PRIVATE_BITS; else if (!(epi->event.events & EPOLLET)) { /* * If this file has been added with Level * Trigger mode, we need to insert back inside * the ready list, so that the next call to * epoll_wait() will check again the events * availability. At this point, no one can insert * into ep->rdllist besides us. The epoll_ctl() * callers are locked out by * ep_scan_ready_list() holding "mtx" and the * poll callback will queue them in ep->ovflist. */ list_add_tail(&epi->rdllink, &ep->rdllist); ep_pm_stay_awake(epi); } } ep_done_scan(ep, &txlist); mutex_unlock(&ep->mtx); return res; } epoll是如何实现事件触发的

epoll 是一种高效的 I/O 事件监测机制,它可以在 Linux 系统中监听多个文件描述符,并在 I/O 可以执行时通知进程。epoll 支持两种事件触发模式:边缘触发 (ET) 和水平触发 (LT) 。

在 epoll 中,水平触发(Level-Triggered,LT)和边缘触发(Edge-Triggered,ET)的实现方式有所不同。

在水平触发模式下,当一个文件描述符上有数据可读或可写时,内核会不断通知应用程序,直到所有数据都被读取或写入。应用程序在接收到通知后,可以选择读取或写入任意数量的数据,而不必担心是否会丢失数据。

在 epoll 中,水平触发模式是默认模式。当应用程序调用 epoll_wait 等待事件时,如果文件描述符上有事件发生,内核会返回一个标志,表示该文件描述符上有事件可以处理。

而在边缘触发模式下,内核只在文件描述符状态发生变化时才会通知应用程序。当应用程序接收到通知后,必须立即读取或写入所有可用数据,否则数据会被丢失。

在 epoll 中,可以通过设置 EPOLLET 标志将文件描述符设置为边缘触发模式。当应用程序调用 epoll_wait 等待事件时,如果文件描述符上的事件未被处理,则会阻塞等待,直到有新的事件发生为止。在边缘触发模式下,应用程序必须处理所有可用数据,并确保在下一次读取或写入之前不会发生数据丢失。

在select中我们了解到当一个事件到达后,通过回调函数完成了select中的唤醒,在epoll中通过回调数完成事件触发的,并将调用poll()函数将对应的文件描述加入以准备函数的链表中。也就是说没有实现poll的文件描述符是无法实现水平触发和边缘触发的事件通知机制的。

三者的区别

select、poll和epoll都是Linux下用于I/O多路复用的机制,但它们在实现和使用上有一些不同:

select和poll的缺点:在需要监视大量文件描述符时,每次调用select或poll都需要将待检查的文件描述符集合从用户态复制到内核态,这个过程会耗费大量时间。此外,select和poll对于大量的文件描述符时,采用轮询的方式来扫描所有的文件描述符,这样会导致系统的效率降低。epoll的优点:epoll通过epoll_ctl注册文件描述符,把关注的文件描述符加入到红黑树或哈希表中,epoll_wait则等待内核事件通知。这样可以避免了轮询操作,只需要在文件描述符上发生事件时才会通知,因此epoll的效率比select和poll高很多。同时,epoll支持边缘触发和水平触发两种模式,可以更好地满足应用程序的需求。epoll的缺点:epoll的使用比select和poll稍微复杂一些,需要用户程序维护一个事件就绪队列,同时需要为每个文件描述符分配一个状态结构体。这样可能会导致代码量增加,程序的维护和调试难度加大。

select和poll适用于少量文件描述符的场景,而epoll适用于大量文件描述符的场景。在高并发、高吞吐量的网络应用中,epoll是最好的选择。

信号驱动式I/O

信号驱动式 I/O(Signal-Driven I/O)是一种 I/O 多路复用的技术,可以用于异步处理 I/O 事件。

在信号驱动式 I/O 中,进程使用系统调用 sigaction() 来注册一个信号处理函数,该函数会在 I/O 事件就绪时被内核调用。当进程调用 sigaction() 注册一个信号处理函数时,它需要指定一个描述符和一个事件,内核在检测到该描述符上发生指定事件时,会向进程发送指定信号。进程可以通过捕获该信号并执行相应操作来实现异步 I/O。

与其他 I/O 多路复用技术相比,信号驱动式 I/O 的主要优点是可以在处理多个描述符时避免阻塞进程,提高了程序的并发性能和响应能力。同时,信号驱动式 I/O 也避免了轮询机制的开销,从而减少了 CPU 的占用。

但是,信号驱动式 I/O 也存在一些缺点。首先,它对信号的处理需要一定的时间,因此它不适合高速 I/O 操作。其次,由于信号是不可靠的,因此在使用信号驱动式 I/O 时需要考虑到信号可能会丢失的情况。最后,信号驱动式 I/O 在多个描述符之间切换时可能会存在竞争条件和死锁问题。

异步I/O模型

异步 I/O 是一种 I/O 处理方式,它允许一个进程在等待 I/O 操作完成时继续执行其他任务,而不必阻塞等待 I/O 完成。异步 I/O 通过操作系统提供的通知机制,在 I/O 操作完成时通知进程,以实现异步处理。

异步 I/O 模型的实现通常涉及三个主要组件:

请求:异步 I/O 的请求由应用程序发起,通常包括描述符、缓冲区地址和数据长度等参数。在发起异步 I/O 请求后,应用程序可以继续执行其他任务。内核:内核负责将异步 I/O 请求提交到 I/O 队列中,并将请求相关的数据保存在内核中。当 I/O 操作完成时,内核会向应用程序发送通知,以便应用程序可以处理 I/O 操作的结果。通知机制:通知机制用于将 I/O 完成的事件通知给应用程序,以便应用程序可以及时处理 I/O 操作的结果。通知机制通常采用信号、回调函数或者事件通知等方式。

异步 I/O 模型相比同步 I/O 模型的主要优势在于它可以让应用程序在等待 I/O 完成时继续执行其他任务,提高了程序的并发性能和响应能力。但是,异步 I/O 模型的实现比同步 I/O 模型更加复杂,需要使用操作系统提供的通知机制来处理 I/O 完成的事件,同时也需要考虑到异步 I/O 可能会引入的竞争条件和死锁问题。

异步 I/O 和非阻塞 I/O 都是一种非阻塞的 I/O 处理方式,它们都允许应用程序在等待 I/O 完成时继续执行其他任务。但是,它们之间有几个主要的区别:

实现方式:异步 I/O 由操作系统内核来处理 I/O 操作,并使用通知机制将 I/O 完成的事件通知给应用程序。而非阻塞 I/O 则需要应用程序在发起 I/O 请求后轮询检查 I/O 是否完成。接口:异步 I/O 通常使用操作系统提供的系统调用,如 aio_read() 和 aio_write() 等,应用程序发起 I/O 请求后可以立即返回。而非阻塞 I/O 则需要使用相应的系统调用,并将文件描述符设置为非阻塞模式,这样在进行 I/O 操作时可以立即返回。编程模型:异步 I/O 通常使用事件驱动编程模型,应用程序使用回调函数处理 I/O 完成的事件。而非阻塞 I/O 则需要应用程序自己实现轮询检查 I/O 完成的逻辑。

异步 I/O 与非阻塞 I/O 都是一种提高 I/O 性能的方式,但它们在实现方式、接口和编程模型上有所不同。异步 I/O 的实现更加复杂,需要操作系统内核来处理 I/O 操作和通知,但它可以让应用程序更加高效地处理 I/O 事件。而非阻塞 I/O 的实现相对简单,但需要应用程序自己实现轮询检查 I/O 完成的逻辑,较为繁琐。

总结

阻塞IO实现简单,但是性能不佳,非阻塞IO虽然不需要阻塞线程,但是他需要轮询操作低效。多路复用IO有三种实现,select和poll都是使用了轮询的方式,监听文件描述符不能太多。epoll的实现使用了红黑树,增删改文件描述符效率高,并且使用了事件触发机制,不需要进行轮询。信号IO依赖于信号机制,它对信号的处理需要一定的时间,因此它不适合高速 I/O 操作。异步 I/O 模型的实现比同步 I/O 模型更加复杂,需要使用操作系统提供的通知机制来处理 I/O 完成的事件,同时也需要考虑到异步 I/O 可能会引入的竞争条件和死锁问题。



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有