创作不易,转载请说明出处(大家有想了解的原理,也可以直接私聊告诉我,作者会抽空进行详细描述)
Epoll函数用于实现Java和众多语言底层的IO多路复用,我们可以创建一个epoll对象,然后将我们的fd(File Descriptor 文件描述符对象)注册到epoll对象中,之后我们可以通过调用epoll对象的查询函数来获取准备好事件的fd对象(Java Coder可以参考下Selector、Channel的使用方式,C Coder不用我介绍了吧)。本文用于详细解释在Linux2.6版本时,Epoll的三大函数的源码与原理分析,详细说明其中的设计技巧。Linux 现在发展到了非常高的版本,本文选取最开始的Epoll实现,此版本没有经过任何优化的修改,最为纯粹,方便读者理解。后面有时间再详细的编写高版本对于Epoll的优化,包括红黑树的引入等等。本文稍长,需静下心来理解之。
三大函数原型epoll_create函数
该函数用于创建一个代表Epoll对象的fd。我们知道Linux继承了Unix的优良血统,将一切均视为文件对象处理,这里的文件有两层含义:真实的磁盘上的文件、虚拟文件。而我们这里的Epoll便是一个虚拟文件对象。该函数的参数定义如下:
__size参数:用于提供给内核一个提示,当前需要监听的fd个数,具体怎么做由内核来处理
返回值:代表当前Epoll对象的fd
epoll_ctl函数
该函数用于控制上面我们由epoll_create方法创建的Epoll 对象。我们可以通过该函数操作Epoll 实现添加、修改、删除监听的fd对象。参数定义如下:
__epfd:表示我们通过epoll_create方法创建的Epoll 对象
__op:表示操作类型:EPOLL_CTL_ADD(添加监听的fd)、EPOLL_CTL_DEL(删除监听的fd)、EPOLL_CTL_MOD(修改监听的fd)
__fd:表示需要操作目标的fd对象
__event:表示用于描述需要监听的fd对象的感兴趣事件类型
返回值:0表示成功,-1表示失败
epoll_wait函数
该函数用于获取Epoll对象监听的fd对象列表,我们可以通过该参数实现我们通过epoll_ctl添加到Epoll对象中监听的且准备好事件的fd对象。该函数的参数定义如下:
__epfd:表示我们通过epoll_create方法创建的Epoll 对象
__events:表示用于接收准备好事件的fd对象的缓冲区
__maxevents:表示这一次调用可以接收多少准备好的fd对象,通常我们将该参数设置为events参数的长度
__timeout:表示如果没有准备好的事件对象,那么等待多久返回
返回值:返回events缓冲区中有效的fd个数,也即准备好事件的fd个数
epoll_event与epoll_data对象
epoll_data结构用于保存用户数据,epoll_event结构用于表示监听的fd对象的事件和用户数据。这里读者可以很容易的理解:Epoll需要监听fd并且需要指定监听的事件类型,而这时我们就需要epoll_event数据载体,同时我们可能需要设置一些与之关联的数据,比如fd对象,用户自己的数据ptr等等(Java Coder 可以参考下Selector和Channel绑定时可以指定Attachment一样)。详细描述如下。
边缘触发与水平触发
Epoll对象对于监听的fd处理方式有两种:边缘触发(edge-triggered)简称ET、 水平触发(level-triggered)简称LT。在ET模式下,当我们通过epoll_wait函数获取到准备好的事件后,如果没有处理完成所有事件,那么再次调用epoll_wait函数将不会再次返回该fd,而LT模式下如果fd的事件没有处理完成,那么在下一次调用epoll_wait函数时将会返回该fd。我们用一个例子来说明:
首先我们将一个读取数据的rfd注册到epoll对象中
然后我们往这个rfd中写入2KB的数据
当我们调用epoll_wait函数时,由于此时有2KB的数据可以读取,那么将会返回这个已经准备好事件的rfd
接着我们读取rfd中1KB的数据,此时注意rfd中还有1KB没有处理
然后我们再次调用epoll_wait函数
此时,如果我们使用添加rfd时指定EPOLLET,也即ET模式时,那么尽管rfd中还有1KB的数据可以读取,但是用于我们处于ET模式,此时再次调用epoll_wait函数将会阻塞当前线程,因为ET模式需要用户自己处理完当前事件的数据。当然如果我们使用EPOLLLT,那么此时再次调用epoll_wait函数那么将会返回剩余1KB读取的rfd对象。
使用示例
我们这里以一个服务端监听连接事件的listen_sock fd为例来说明Epoll如何使用。读者可以从例子中看到,其实就是创建Epoll对象,使用提供的三个函数进行CRUD(Java Coder可以类比NIO的ServerScoketChannel和ScoketChannel来学习,因为操作方式一模一样,只不过屏蔽了Epoll的细节)。
小结
我们看到epoll_create函数用于创建对象,而epoll_ctl函数用于对Epoll对象增删改操作,epoll_wait用于对Epoll对象实现查询操作。epoll_event与epoll_data对象用于承载与Epoll对象交互的数据结构。而对于ET和LT触发而言,我们知道ET模式一个事件只会触发一次,如果在该事件中的数据没有处理完毕,那么再下一个事件到来时,将不会再次返回监听的fd。而对于LT而言如果数据没有处理完毕,那么我们将可以再次调用epoll_wait函数处理未处理完成数据的fd。
三大函数内核原理
在前面了解了如何使用Epoll后,本节将会从三个函数的Linux内核源码进行原理讲解。因为Linux中一切皆文件,但是由于文件系统是另外一个模块,本文肯定不会花大量篇幅介绍文件系统相关的概念,毕竟那样就跑题了,但是由于读者的水平参差不齐,还是有必要介绍一下VFS的相关概念。我们前面说到过Linux中一切皆文件,而对于文件来说有在磁盘上真实存在的文件和虚拟文件,比如Epoll文件。为了兼容这两者,Linux提出了VFS的虚拟文件系统,用其作为访问文件系统的抽象层,而对于应用而言只需要面对VFS即可,而不需要知道底层是虚拟文件还是真实文件,反正一切皆文件,按文件形式来处理即可。这时就引入了file结构体,该结构体代表了一个文件对象,而inode结构体用于表示文件的元数据信息,也称之为index node 索引节点。每个进程都需要打开文件,而这时就等同于创建了一个file对象,而我们不可能把file对象暴露给用户空间,读者一定要记住:内核永远不信任用户空间的数据,这就意味着从用户空间过来的数据都需要拷贝到内核中才能使用(当然对于基本数据类型变量我们只需要强转为不会造成内核瘫痪的数据即可,比如下面的size变量)。而对于从内核到用户空间的数据,也需要拷贝,同时不会将内核的数据结构暴露给用户空间,这时就需要一个映射,将file文件对象映射到一个整形变量,将这个整形变量返回给用户空间,而用户空间如果要操作file对象,那么传入该fd,内核就可以反映射到file文件对象操作即可,这个映射信息也即保存在进程的PCB控制块中,对于Linux而言就是task_struct(在不久将来,作者将会基于该版本详细介绍进程管理模块,读者这里了解下即可)。
sys_epoll_create原理
该函数用于创建epoll对象同时返回fd。我们看到首先通过ep_get_hash_bits函数计算出hashbits变量,随后调用ep_getfd函数创建一个Epoll对象,并且将其与Epoll fd关联。最后调用ep_file_init函数初始化epoll对象。对于Java Coder而言,可能不太熟悉goto语句,但是还是需要习惯一下,这里我们看到定义了两个退出点:eexit_2、eexit_1,分别用于在不同错误下进行返回执行清理工作或者打印错误信息。详细描述如下(再次强调一下:Linux中一切皆文件,Epoll对象也是文件)。
<pre data-lang="text/x-csrc@clike@C" codecontent="long sys_epoll_create(int size){
int error, fd;
unsigned int hashbits;
struct inode *inode;
struct file *file;
// 根据传入的hint size(提示大小)计算出hash位数
hashbits = ep_get_hash_bits((unsigned int) size);
// 创建一个Epoll对象,并且将其与Epoll fd关联
error = ep_getfd(&fd, &inode, &file);
if (error)
goto eexit_1;
// 初始化Epoll对象
error = ep_file_init(file, hashbits);
if (error)
goto eexit_2;
return fd; // 返回fd
eexit_2:
sys_close(fd);
eexit_1:
DNPRINTK(3, (KERN_INFO "[%p] eventpoll: sys_epoll_create(%d) = %dn",
current, size, error));
return error;
}" class=" language-clike">
ep_getfd函数原理
该函数用于初始化efd指针、einode指针、efile指针,分别表示为Epoll对象的fd、Epoll对象的文件的元数据对象、文件对象,同时将efd与dfile文件对象进行了关联。由于篇幅有限,并且又会涉及到内存模块相关的知识,所以作者这里并没有将ep_eventpoll_inode、d_alloc等等这些与VFS相关的内容展开,因为那并不属于Epoll的研究范畴了,我们只需要从该函数中注意到:file->f_op = &eventpoll_fops这行代码即可。我们知道一个文件是需要操作的,而这些操作的函数便在eventpoll_fops结构中,当我们调用文件对象的这些函数时,由于我们设置了地址为eventpoll_fops,所以将会调用这里面的函数进行操作。对于Java Coder而言,这就是C语言中的接口。我们定义了一堆接口方法,那么需要实现,而eventpoll_fops就是实现。对于C语言而言并没有接口的概念,不过C语言有指针,我们设置函数指针指向不同的函数便实现了抽象的过程,这也是VFS的核心。详细实现如下。
<pre data-lang="text/x-csrc@clike@C" codecontent="static int ep_getfd(int *efd, struct inode **einode, struct file **efile)
{
struct qstr this;
char name[32];
struct dentry *dentry; // Epoll文件目录对象
struct inode *inode; // Epoll索引节点对象
struct file *file; // Epoll文件对象
int error, fd;
error = -ENFILE;
file = get_empty_filp(); // 获取一个空的文件对象,在这里面我们就会进行max_files打开的最大文件对象的限制检测,同时分配Epoll文件对象file
if (!file)
goto eexit_1;
inode = ep_eventpoll_inode(); // 接着我们分配Epoll索引节点
error = PTR_ERR(inode);
if (IS_ERR(inode))
goto eexit_2;
// 然后从进程的files_struct结构中获取一个空闲的fd,在这里面我们就需要进行进程打开的最大文件描述符限制的检测了,在当前版本中的限制为:#define INR_OPEN 1024
error = get_unused_fd();
if (error < 0)
goto eexit_3;
fd = error;
error = -ENOMEM;
sprintf(name, "[%lu]", inode->i_ino);
this.name = name;
this.len = strlen(name);
this.hash = inode->i_ino;
dentry = d_alloc(eventpoll_mnt->mnt_sb->s_root, &this); // 分配目录节点
if (!dentry)
goto eexit_4;
dentry->d_op = &eventpollfs_dentry_operations; // 设置目录操作
d_add(dentry, inode);
// 设置file对象文件属性
file->f_vfsmnt = mntget(eventpoll_mnt);
file->f_dentry = dget(dentry);
file->f_pos = 0;
file->f_flags = O_RDONLY;
file->f_op = &eventpoll_fops; // 我们关注这里即可,初始化了文件对象的基础操作回调函数
file->f_mode = FMODE_READ;
file->f_version = 0;
file->private_data = NULL;
// 将fd与file对象进行关联,读者这里就把fd当成数组下标,然后数组中的对象为file即可
fd_install(fd, file);
// 将efd、einode、efile指针指向前面分配的三大结构的地址
*efd = fd;
*einode = inode;
*efile = file;
return 0;
eexit_4:
put_unused_fd(fd);
eexit_3:
iput(inode);
eexit_2:
put_filp(file);
eexit_1:
return error;
}
// Epoll文件操作的函数结构体
static struct file_operations eventpoll_fops = {
.release = ep_eventpoll_close,
.poll = ep_eventpoll_poll
};
" class=" language-clike">
ep_file_init函数原理
我们在ep_getfd函数中看到的都是VFS相关的对象,而我们说Epoll文件是一个虚拟文件,那么虚拟文件就需要一个数据载体来表示Epoll结构,而eventpoll结构,便是这样一个用来实际操作Epoll的核心数据结构之一。我们看到首先分配一个eventpoll结构的空间,随后调用ep_init函数初始化该结构,为了保持简单明了,作者这里就不展开该方法讲解linux操作系统怎么样,我们在下一小节直接看该结构的数据定义即可。详细描述如下。
eventpoll结构原理
该结构存放在efile文件对象的private_data中,再次强调一遍:Linux一切皆文件linux 调用内核函数,此时需要真实的文件载体,而该结构就是这个载体,可能读者会问如果不介入VFS,是不是直接用该结构就可以了?答案是的。详细的参数描述如下。
<pre data-lang="text/x-csrc@clike@C" codecontent="struct eventpoll {
rwlock_t lock; // 保护该结构的读写锁
struct rw_semaphore sem; // 用于保护eventpoll文件对象的读写信号量
wait_queue_head_t wq; // sys_epoll_wait()函数时,用于保存阻塞进程的等待队列
wait_queue_head_t poll_wait; // 用于调用file->poll()函数时,阻塞进程的等待队列
struct list_head rdllist; // 已经准备好事件的fd列表
unsigned int hashbits; // 通过传入的size计算的hash位数
char *hpages[EP_MAX_HPAGES]; // 用于存放struct epitem的数据页
};" class=" language-clike">
epitem结构原理
该结构用于表示添加到Epoll中的文件信息,每一个添加到Epoll中监听的fd都会拥有这样的一个结构。
sys_epoll_ctl原理
该函数用于操作Epoll对象,对Epoll对象添加、删除、修改操作。我们看到首先通过epfd、fd取出epoll文件对象和操作的文件对象,然后进行校验,我们注意到只有支持tfile->f_op->poll操作的对象才可以用于epoll监听,随后我们根据op操作调用不同函数来操作epoll对象。同时读者应该需要知道epoll_event的事件结构中的events变量是一个整形变量,所以是按位作为标志位来开启不同的感兴趣事件,我们通过或运算符可以组合这些位。详细实现如下。
ep_insert函数原理
该函数用于将监听的tfile文件对象放入epitem中,然后将其放入对应的链表中。我们看到首先初始化epi中的链表结构,接着初始化epitem结构体变量linux 调用内核函数,然后初始化poll table的回调函数为ep_ptable_queue_proc,我们稍后就会详细介绍该回调函数的作用,以及其调用的原理。最后添加epitem到epoll的监听链表中,检测是否当前文件对象在添加到队列后,马上就发生了感兴趣事件,如果是这样,那么将其放入到epollevent的rdllink准备链表的末尾,并唤醒等待进程。详细实现如下。
<pre data-lang="text/x-csrc@clike@C" codecontent="static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
struct file *tfile, int fd){
int error, revents, pwake = 0;
unsigned long flags;
struct epitem *epi;
struct ep_pqueue epq;
error = -ENOMEM;
if (!(epi = EPI_MEM_ALLOC())) // 分配一个新的epitem结构
goto eexit_1;
// 初始化epi中的链表结构
INIT_LIST_HEAD(&epi->llink);
INIT_LIST_HEAD(&epi->rdllink);
INIT_LIST_HEAD(&epi->fllink);
INIT_LIST_HEAD(&epi->txlink);
INIT_LIST_HEAD(&epi->pwqlist);
// 初始化epitem结构体变量
epi->ep = ep;
epi->file = tfile;
epi->fd = fd;
epi->event = *event;
atomic_set(&epi->usecnt, 1); // 设置引用计数为1
epi->nwait = 0;
// 初始化poll table的回调函数为ep_ptable_queue_proc
epq.epi = epi;
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
// 执行目标文件的poll函数,将会回调ep_ptable_queue_proc函数
revents = tfile->f_op->poll(tfile, &epq.pt);
if (epi->nwait < 0)
goto eexit_2;
// 添加epitem到目标文件对象的epoll hook 链表
spin_lock(&tfile->f_ep_lock);
list_add_tail(&epi->fllink, &tfile->f_ep_links);
spin_unlock(&tfile->f_ep_lock);
write_lock_irqsave(&ep->lock, flags);
// 添加epitem到epoll的监听链表中(注意我们这里使用的是基于hash表的链表,也即找到索引下标,然后链到不同的下标中,可以参考下hashmap的链地址法)
list_add(&epi->llink, ep_hash_entry(ep, ep_hash_index(ep, tfile, fd)));
// 如果此时文件已经准备好了且当前epi没有被放入到epollevent的rdllink准备链表的末尾
if ((revents & event->events) && !EP_IS_LINKED(&epi->rdllink)) {
list_add_tail(&epi->rdllink, &ep->rdllist);
// 如果有进程正在阻塞,那么唤醒进程处理该准备好的事件
if (waitqueue_active(&ep->wq))
wake_up(&ep->wq);
if (waitqueue_active(&ep->poll_wait)) // 唤醒通过poll函数阻塞进程
pwake++;
}
write_unlock_irqrestore(&ep->lock, flags);
if (pwake)
ep_poll_safewake(&psw, &ep->poll_wait);
return 0;
eexit_2:
ep_unregister_pollwait(ep, epi);
write_lock_irqsave(&ep->lock, flags);
if (EP_IS_LINKED(&epi->rdllink))
EP_LIST_DEL(&epi->rdllink);
write_unlock_irqrestore(&ep->lock, flags);
EPI_MEM_FREE(epi);
eexit_1:
return error;
}" class=" language-clike">
接下来我们来看ep_pqueue结构体原理。我们看到ep_pqueue结构仅仅作为poll_table和epitem结构体的包装,而poll_table_struct结构体只是一个回调函数poll_queue_proc,我们可以看到该函数接收一个文件对象和wait_queue_head_t等待队列指针,poll_table_struct结构体,也即poll_table结构体。
接下来我们来看init_poll_funcptr(&epq.pt, ep_ptable_queue_proc)的原理。我们看到该函数仅仅是把poll_table的函数指针指向qproc。
那么来看看传入的ep_ptable_queue_proc函数的实现。我们看到首先通过poll_table地址取出epitem(读者可以想想如何获取?我们在前面看到ep_pqueue结构中包含poll_table,而这个地址的下面便是epitem,我们可以怎么做?用poll_table的地址转为ep_pqueue结构体指针,然后直接取epitem即可),然后分配一个eppoll_entry结构体,然后调用init_waitqueue_func_entry初始化eppoll_entry的变量和回到函数为ep_poll_callback为ep_poll_callback,这时相当于把eppoll_entry的wait_queue_t的变量的回调函数设置为ep_poll_callback函数入口,同时将将eppoll_entry的wait_queue_t wait 节点添加到wait_queue_head_t whead的链表中(该等待节点我们已经设置了回调函数为ep_poll_callback,当数据可用时将会回调该函数),并且将eppoll_entry添加到epitem的pwqlist链表中,此时读者是否发现:我们既可以从操作的fd的等待链接表中回调wait中的函数,又可以从epitem节点中通过遍历pwqlist链表来获取到等待信息结构eppoll_entry?所以我们可以稍微抽一下这些共同点:将结构通过list_head结构和其变种wait_queue_head_t结构关联在一起,为何关联?查询操作嘛。详细实现如下所示。
我们看到这个回调函数就是初始化一个eppoll_entry设置回调函数为ep_poll_callback然后将其添加到了目标文件对象的等待节点中,那么我们如何知道这个链表节点是谁呢?是否还有一个疑惑:是谁来回调这个ep_ptable_queue_proc函数呢?我们在上面看到最后会调用tfile->f_op->poll(tfile, &epq.pt)的poll函数,那么我们这里以网络函数来看看poll函数做了什么?我们看到对于网络Socket文件来说,当我们将其插入到Epoll后,将会通过上面的poll函数调用sock_poll,该函数进一步调用tcp_poll(因为我们假定使用TCP协议),而在tcp_poll函数中调用了poll_wait函数,该函数我们看到就是回调ep_ptable_queue_proc。详细描述如下。
作者一直说:talk is cheap ,show me the code。给出任何结论,作者都会给出论据。所以我们还有一个论据需要给出。我们说过在Socket fd中我们调用了tcp_poll函数,将eppoll_entry结构中的wait_queue_t wait结构添加到了sk->sk_sleep链表中,那么我们说会被回调其中的func函数,那么是怎样被回调呢?我们看以下代码。sock_init_data函数用于初始化sock结构,可能读者不清楚Linux的网络模块,会问:为什么会有一个socket和sock结构,这里稍微提一下,在Linux中兼容对BSD 对网络的规范,而这个socket结构就是general BSD socket结构,而我们知道,兼容是兼容,但毕竟我Linux可以有自己的socket,而这个sock就是Linux实现网络模块的结构。我们看到这里设置了多个回调函数,当sock发生对应事件时回调这些函数。我们这里以sk_state_change事件来举例,我们看到当sock的状态改变后,在sock_def_wakeup方法中调用wake_up_interruptible_all,该函数名为唤醒所有的进程,但是由于我们在Epoll添加过程中不存在进程的阻塞,所以设置了wait_queue_t的func为ep_poll_callback函数,所以最后在__wake_up_common链表遍历中调用了该回调函数。当然,sock其他的回调函数也是如此。详细描述如下。
<pre data-lang="text/x-csrc@clike@C" codecontent="void sock_init_data(struct socket *sock, struct sock *sk){
...
sk->sk_state_change = sock_def_wakeup; // sock状态改变后回调
sk->sk_data_ready = sock_def_readable; // sock数据可用时回调
sk->sk_write_space = sock_def_write_space; // sock写数据的空间可用时回调
sk->sk_error_report = sock_def_error_report; // sock发生错误时回调
...
}
// sock状态改变后回调
void sock_def_wakeup(struct sock *sk){
read_lock(&sk->sk_callback_lock); // 获取读锁
if (sk->sk_sleep && waitqueue_active(sk->sk_sleep)) // 等待队列不为空,那么调用wake_up_interruptible_all唤醒sk_sleep链表中的等待节点
wake_up_interruptible_all(sk->sk_sleep);
read_unlock(&sk->sk_callback_lock);
}
#define wake_up_interruptible_all(x) __wake_up((x),TASK_INTERRUPTIBLE, 0); // 这里的0表示非互斥唤醒,也即唤醒全部
void __wake_up(wait_queue_head_t *q, unsigned int mode, int nr_exclusive){
unsigned long flags;
spin_lock_irqsave(&q->lock, flags);
__wake_up_common(q, mode, nr_exclusive, 0); // 调用该函数完成唤醒过程
spin_unlock_irqrestore(&q->lock, flags);
}
static void __wake_up_common(wait_queue_head_t *q, unsigned int mode, int nr_exclusive, int sync){
struct list_head *tmp, *next;
list_for_each_safe(tmp, next, &q->task_list) { // 遍历等待链表wait_queue_head_t *q
wait_queue_t *curr;
unsigned flags;
curr = list_entry(tmp, wait_queue_t, task_list); // 从当前task_list地址中获取到wait_queue_t结构
flags = curr->flags;
if (curr->func(curr, mode, sync) && // 调用其设置的回调函数,这里如果是epoll,那么就是回调了ep_poll_callback函数
(flags & WQ_FLAG_EXCLUSIVE) && // 如果指定了互斥唤醒,那么使用nr_exclusive来决定唤醒多少个进程(读者一定要注意这里的标志位,因为我们在ep_poll函数中添加等待进程时,可没有指定这个标志位哟,考虑下惊群?)
!--nr_exclusive)
break;
}
}" class=" language-clike">
接下来我们就可以直接来看ep_poll_callback回调函数的原理了。
ep_remove原理
该函数用于从epoll监听fd链表中移除对应的fd。其实我们看到只不过是insert的逆向过程罢了,我们做了哪些链接,那么就需要将其从对应的链表中摘除,然后将其从epoll结构中移除,最后释放epitem所占用的内存空间。详细描述如下。
ep_modify原理
该函数用于修改监听文件对象的感兴趣事件集。我们看到首先修改了感兴趣事件集合,然后调用修改fd的poll函数,该函数将会查看一下当前是否有准备好的事件集,然后我们进一步判断当前epitem是否被移除,如果没有那么根据当前事件集和epitem的状态来选择将其从eventpoll准备好的事件链表中放入还是移除。详细描述如下。
sys_epoll_wait原理
该函数用于实现对Epoll对象进行查询的操作。我们从epoll_wait函数中知道,struct epoll_event __user *events用于存放准备好的事件集,int maxevents用于指定大小,通常等于events的数组大小,同时 int timeout用于指明超时等待时间。我们说linux内核不相信任何用户空间的数据,所以对这些参数进行了详细校验。然后获取到eventpoll对象调用ep_poll函数完成事件获取。
在ep_poll函数中,我们看到首先计算超时时间,然后检测下准备好事件链表是否为空,若为空,那么创建进程等待节点放入到eventpoll对象的wq链表中,同时设置进程状态为TASK_INTERRUPTIBLEred hat linux 下载,然后检测下当前进程没有需要处理的信号,那么调用schedule_timeout使用进程调度器来选择其他进程执行(可能读者不熟悉Linux内核,是这样的,内核不同于用户空间,我们需要手动将控制权交由进程管理模块来调度,而这里的schedule_timeout函数便是进程调度模块的核心函数,该函数将使用我们熟知的进程调度器,使用调度算法选取下一个进程完成进程切换,后面的文章作者会详细的描述该切换过程)。由此我们可以得出结论,该等待进程的返回情况有三种:由ep_poll_callback回调函数唤醒、进程传递信号唤醒、超时时间到唤醒。如果有准备好的事件发生后,我们会调用ep_events_transfer函数构建txlist并将其复制到用户指定的内存空间中。
在ep_events_transfer函数中,我们看到首先调用ep_collect_ready_items函数将rdllist中的准备好的事件取出来放入txlist中,获取的数量由maxevents指定,然后我们调用ep_send_events将txlist复制到用户指定的 struct epoll_event __user *events空间中,该函数作者没有展开,因为没有必要,这里又会涉及到Linux的内存管理原理,比如copy_to_user函数的使用方式,这已经不属于Epoll的范畴,所以读者也不需要在本篇文章了解。最后调用了ep_reinject_items函数来看看是否需要将txlist中的事件重新放入到epoll的rdllist中。
在ep_reinject_items函数中,我们看到 EP_IS_LINKED(&epi->llink) && !(epi->event.events & EPOLLET) 核心判断逻辑,该判断看看如果epitem没有被删除且设置的事件处理类型不为EPOLLET(边缘触发),那么就会将其重新放入到rdllist中,最后判断如果重放入了,那么唤醒所有等待进程。那么问题就来了:很多人把这一步当做是惊群效应,但其实并不是。这正是Epoll的高性能的体现,大伙想想:如果返回的fd数量较大,而我们使用了水平触发,一个进程处理不过来,我是否可以多个进程来唤醒处理,然后由处理函数保证互斥即可,这何尝不是高性能的体现?而我们所需要注意的惊群在这个版本中确实有,存在于我们之前介绍的ep_poll_callback回调函数和ep_poll函数中,由于在添加进程等待节点时并没有指定WQ_FLAG_EXCLUSIVE标志位,从而在前面我们介绍的__wake_up_common函数中将不会响应nr_exclusive变量,尽管我们的wake_up函数的nr_exclusive为1,但是由于没有设置标志位,所以并不响应,前面详细描述过啦。函数详细实现如下所示。
<pre data-lang="text/x-csrc@clike@C" codecontent="long sys_epoll_wait(int epfd, struct epoll_event __user *events,int maxevents, int timeout){
int error;
struct file *file;
struct eventpoll *ep;
// 最大获取事件必须大于0
if (maxevents <= 0)
return -EINVAL;
// 验证传入的存放准备好事件的内存区域合法性
if ((error = verify_area(VERIFY_WRITE, events, maxevents * sizeof(struct epoll_event))))
goto eexit_1;
error = -EBADF;
file = fget(epfd); // 根据epoll fd获取到epoll文件对象
if (!file)
goto eexit_1;
error = -EINVAL;
if (!IS_FILE_EPOLL(file)) // 检查当前epoll file对象是否为epoll文件
goto eexit_2;
ep = file->private_data; // 获取到epoll文件对象的实际承载体:eventpoll结构
error = ep_poll(ep, events, maxevents, timeout); // 调用该函数获取事件
eexit_2:
fput(file);
eexit_1:
DNPRINTK(3, (KERN_INFO "[%p] eventpoll: sys_epoll_wait(%d, %p, %d, %d) = %dn",
current, epfd, events, maxevents, timeout, error));
return error;
}
// 获取准备好的事件集,如果指定了timeout,那么等待超时后返回
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,int maxevents, long timeout){
int res, eavail;
unsigned long flags;
long jtimeout;
wait_queue_t wait;
// 计算超时时间。如果timeout为-1,那么超时时间戳为MAX_SCHEDULE_TIMEOUT 该值为long的最大值,否则我们根据内核的时钟频率HZ来计算,当前内核的频率为100HZ
jtimeout = timeout == -1 || timeout > (MAX_SCHEDULE_TIMEOUT - 1000) / HZ ?
MAX_SCHEDULE_TIMEOUT: (timeout * HZ + 999) / 1000;
retry:
write_lock_irqsave(&ep->lock, flags);
res = 0;
if (list_empty(&ep->rdllist)) { // 如果准备好的事件链表为空那么构建当前进程的等待节点结构wait_queue_t wait,并将其添加到eventpoll的wq等待链表
init_waitqueue_entry(&wait, current);
add_wait_queue(&ep->wq, &wait); // 注意:由于这里添加节点时没有指定互斥状态WQ_FLAG_EXCLUSIVE,从而在callback唤醒时会唤醒所有等待进程
// 循环等待直到满足条件
for (;;) {
set_current_state(TASK_INTERRUPTIBLE); // 设置当前进程状态为TASK_INTERRUPTIBLE(可中断阻塞状态)
if (!list_empty(&ep->rdllist) || !jtimeout) // 如果准备好事件集此时不为空,或者超时,那么退出循环
break;
if (signal_pending(current)) { // 当前进程有需要处理的信号,那么退出并设置返回值为EINTR
res = -EINTR;
break;
}
write_unlock_irqrestore(&ep->lock, flags);
jtimeout = schedule_timeout(jtimeout); // 调用该函数让当前进程阻塞,并且将调用Linux内核的进程调度模块来选择其他进程运行
write_lock_irqsave(&ep->lock, flags);
}
// 满足条件后将当前进程从eventpoll的wq等待链表中移除,并设置TASK_RUNNING的标志位表示当前进程处于运行状态
remove_wait_queue(&ep->wq, &wait);
set_current_state(TASK_RUNNING);
}
eavail = !list_empty(&ep->rdllist);
write_unlock_irqrestore(&ep->lock, flags);
// 如果当前epoll中存在准备好的事件集,那么我们调用ep_events_transfer函数将其复制到用户指定的events内存中,否则我们返回retry处继续尝试
if (!res && eavail && // 如果指定了res那么直接返回res
!(res = ep_events_transfer(ep, events, maxevents)) && jtimeout)
goto retry;
return res;
}
// 将准备好的事件传输到用户指定的内存空间events中
static int ep_events_transfer(struct eventpoll *ep,struct epoll_event __user *events, int maxevents){
int eventcnt = 0;
struct list_head txlist;
INIT_LIST_HEAD(&txlist); // 初始化传输节点
down_read(&ep->sem);
// 将rdllist准备好事件集的链表节点转移到txlist链表中,最大个数为maxevents
if (ep_collect_ready_items(ep, &txlist, maxevents) > 0) {
eventcnt = ep_send_events(ep, &txlist, events); // 将txlist传输到用户空间中
ep_reinject_items(ep, &txlist); // 根据Epoll指定的LT和ET模式来选择是否重新将txlist放入到rdllist中
}
up_read(&ep->sem);
return eventcnt;
}
// 根据Epoll指定的LT和ET模式来选择是否重新将txlist放入到rdllist中
static void ep_reinject_items(struct eventpoll *ep, struct list_head *txlist){
int ricnt = 0, pwake = 0;
unsigned long flags;
struct epitem *epi;
write_lock_irqsave(&ep->lock, flags);
while (!list_empty(txlist)) { // 循环直到txlist链表为空
epi = list_entry(txlist->next, struct epitem, txlink); // 根据txlist的地址获取到所属epitem结构
EP_LIST_DEL(&epi->txlink);
if (EP_IS_LINKED(&epi->llink) && !(epi->event.events & EPOLLET) && // epitem没有被删除且设置的事件处理类型不为EPOLLET(边缘触发)
(epi->revents & epi->event.events) && !EP_IS_LINKED(&epi->rdllink)) {
// 那么将tx中的epitem重新放入rdllist准备好事件集链表中
list_add_tail(&epi->rdllink, &ep->rdllist);
ricnt++; // 计算重放入次数
}
}
// 如果已经重放入准备好事件集,那么唤醒所有等待节点
if (ricnt) {
if (waitqueue_active(&ep->wq))
wake_up(&ep->wq);
if (waitqueue_active(&ep->poll_wait))
pwake++;
}
write_unlock_irqrestore(&ep->lock, flags);
if (pwake)
ep_poll_safewake(&psw, &ep->poll_wait);
}" class=" language-clike">