I/O多路复用之 epoll 系统调用

I/O多路复用除了之前我们提到的selectpoll外,epoll 也可以检查多个文件描述符的就绪状态,以达到I/O多路复用的目的。
epoll 系统调用是 Linux 系统专有的,在 Linux 内核 2.6 版本新增,epoll 的主要优点有:

  • 当检查大量的文件描述符时,epoll 的性能比selectpoll高很多
  • epoll 既支持水平触发也支持边缘触发,selectpoll只支持水平触发

epoll 编程接口的核心数据结构为 epoll 实例,它和一个打开的文件描述符相关联。这个文件描述符是内核数据结构的句柄,该内核数据结构的作用主要有两个:

  • 记录在进程中声明过的感兴趣的文件描述符列表,即 interest list
  • 维护处于I/O就绪状态中文件描述符列表,即 ready list

其中,ready list 是 interest list 的子集。

epoll 编程接口由以下3个系统调用组成:

  • epoll_create创建一个 epoll 实例,返回代码该实例的文件描述符
  • epoll_ctl增删改 epoll 实例的 interest list
  • epoll_wait返回与 epoll 实例相关联的就绪列表中的成员

创建 epoll 实例: epoll_create

系统调用epoll_create创建一个新的 epoll 实例,其对应的 interest list 初始化为空。

1
2
#include <sys/epoll.h>
int epoll_create(int size);

参数size指定了我们想要通过 epoll 实例来检查的文件描述符个数,该参数并不是一个上限,而是告诉内核应该如何为内部数据结构划分初始大小。epoll_create返回新创建 epoll 实例的文件描述符,这个文件描述符在其他几个 epoll 系统调用中会被用来表示 epoll 实例。当这个文件描述符不再使用时,应该通过close来关闭。

从 Linux 2.6.27 版内核以来,Linux 支持了一个新的系统调用 epoll_create1。该系统调用执行的任务同epoll_create,但是去掉了无用的参数size,并增加了一个可用来修改系统调用行为的flag标志。

修改 epoll 实例: epoll_ctl

系统调用epoll_ctl能够修改由文件描述符epfd所代表的 epoll 实例中的 interest list。

1
2
#include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *ev);
  • 参数epfd指定 epoll 实例的文件描述符,即对哪个 epoll 实例进行操作
  • 参数fd指明要修改 interest list 中的哪一个文件描述符。
  • 参数op用来指定需要执行的操作,下文我们还会对op操作类型进行进一步描述
  • 参数ev是指向结构体epoll_event的指针,关于结构体epoll_event的定义,我们也在下文描述

epoll_ctlop支持的操作包括以下以种:

  • EPOLL_CTL_ADD
    将描述符fd添加到 epoll 实例的 interest list 中去。对于fd上我们感兴趣的事件,在ev所指向的结构体中指定。

  • EPOLL_CTL_MOD
    修改描述符fd上设定的事件,需用到由ev所指向的结构体中的信息。

  • EPOLL_CTL_DEL
    将描述符fd从 epoll 实例的 interest list 中移除,该操作忽略ev参数。

上面我们多处提到了evev是指向结构体epoll_event的指针,该结构体的定义如下:

1
2
3
4
struct epoll_event {
uint32_t events; // epoll 事件
epoll_data data; // 用户数据
};

结构体epoll_event中的data字段的类型为epoll_data,其定义以下:

1
2
3
4
5
6
typedef union epoll_data {
void *ptr; // 用户自定义数据的指针
int fd; // 文件描述符
uint32_t u32; // 32位整型
uint64_t u64; // 64位整型
} epoll_data_t;

参数ev为文件描述符fd所做的设置如下:

  • 结构体epoll_event中的events字段是一个位掩码,它指定了 epoll 实例监控的事件集合
  • data字段是一个联合体,当fd就绪时,联合体的成员可用来指定传回给调用进程的信息

就绪等待: epoll_wait

系统调用epoll_wait返回 epoll 实例中处于就绪状态的文件描述符的信息。单个epoll_wait调用能返回多个就绪态文件描述符的信息,这也正是I/O多路复用的体现。

1
2
#include <sys/epoll.h>
int epoll_wait(int epfd, struct epoll_event *evlist, int maxevents, int timeout);
  • 参数evlist所指向的结构体数组中返回就绪状态文件描述符的信息。数据evlist的空间由调用者负责申请,所包含的元素个数在参数maxevents中指定。
  • 参数timeout指定epoll_wait的阻塞行为,例如timeout等于-1,调用将一直阻塞,走到 interest list 中的文件描述符上有事件产生。

epoll_wait 调用成功后,返回数据evlist中的元素个数,即就绪的描述符个数。

例子

我们以编写一个 TCP 服务器为例子,说明 epoll 的用法,该 TCP 服务器打印出所有接收到的消息。
我们先来看创建和绑定 TCP 监听套接字的函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
static int
create_and_bind (char *port)
{
struct addrinfo hints;
struct addrinfo *result, *rp;
int s, sfd;

memset (&hints, 0, sizeof (struct addrinfo));
hints.ai_family = AF_UNSPEC; // 支持 IPv4 和 IPv6
hints.ai_socktype = SOCK_STREAM; // TCP socket
hints.ai_flags = AI_PASSIVE; // 监听套接字

s = getaddrinfo (NULL, port, &hints, &result);
if (s != 0)
{
fprintf (stderr, "getaddrinfo: %s\n", gai_strerror (s));
return -1;
}

for (rp = result; rp != NULL; rp = rp->ai_next)
{
sfd = socket (rp->ai_family, rp->ai_socktype, rp->ai_protocol);
if (sfd == -1)
continue;

s = bind (sfd, rp->ai_addr, rp->ai_addrlen);
if (s == 0)
{
// 已成功绑定套接字
break;
}

close (sfd);
}

if (rp == NULL)
{
fprintf (stderr, "Could not bind\n");
return -1;
}

freeaddrinfo (result);

return sfd;
}

create_and_bind接受port参数(表示监听的端口),其作用是创建并绑定监听套接字。
getaddrinfo函数既可以用于IPv4,也可以用于IPv6,能够处理名字到地址以及服务到端口这两种转换,它返回addrinfo结构体数组的指针。关于getaddrinfo详细介绍,可以参考《UNIX网络编程》的有关描述。
create_and_bind返回结构体addrinfo数组的指针(保存在reslut指针中)接下来,我们对result进行遍历,直到将监听套接字成功绑定为止。

接下来,我们再来看将一个套接字设置为非阻塞套接字的函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
static int
make_socket_non_blocking (int sfd)
{
int flags, s;

flags = fcntl (sfd, F_GETFL, 0);
if (flags == -1)
{
perror ("fcntl");
return -1;
}

flags |= O_NONBLOCK;
s = fcntl (sfd, F_SETFL, flags);
if (s == -1)
{
perror ("fcntl");
return -1;
}

return 0;
}

最后我们来看下main函数的实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
int
main (int argc, char *argv[])
{
int sfd, s;
int efd;
struct epoll_event event;
struct epoll_event *events;

if (argc != 2)
{
fprintf (stderr, "Usage: %s [port]\n", argv[0]);
exit (EXIT_FAILURE);
}

sfd = create_and_bind (argv[1]);
if (sfd == -1)
abort ();

s = make_socket_non_blocking (sfd);
if (s == -1)
abort ();

s = listen (sfd, SOMAXCONN);
if (s == -1)
{
perror ("listen");
abort ();
}

efd = epoll_create1 (0);
if (efd == -1)
{
perror ("epoll_create");
abort ();
}

event.data.fd = sfd;
// ET 模式
event.events = EPOLLIN | EPOLLET;
s = epoll_ctl (efd, EPOLL_CTL_ADD, sfd, &event);
if (s == -1)
{
perror ("epoll_ctl");
abort ();
}

// 用来存储epoll_wait返回的就绪文件描述符列表
events = calloc (MAXEVENTS, sizeof event);

// 主循环
while (1)
{
int n, i;

n = epoll_wait (efd, events, MAXEVENTS, -1);
for (i = 0; i < n; i++)
{
if ((events[i].events & EPOLLERR) ||
(events[i].events & EPOLLHUP) ||
(!(events[i].events & EPOLLIN)))
{
// 监测的文件描述符出错了
fprintf (stderr, "epoll error\n");
close (events[i].data.fd);
continue;
}

else if (sfd == events[i].data.fd)
{
// 监听套接字就绪,表明有一个或者多个连接进来
while (1)
{
struct sockaddr in_addr;
socklen_t in_len;
int infd;
char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];

in_len = sizeof in_addr;
infd = accept (sfd, &in_addr, &in_len);
if (infd == -1)
{
if ((errno == EAGAIN) ||
(errno == EWOULDBLOCK))
{
// 处理完所有的连接
break;
}
else
{
perror ("accept");
break;
}
}

s = getnameinfo (&in_addr, in_len,
hbuf, sizeof hbuf,
sbuf, sizeof sbuf,
NI_NUMERICHOST | NI_NUMERICSERV);
if (s == 0)
{
printf("Accepted connection on descriptor %d "
"(host=%s, port=%s)\n", infd, hbuf, sbuf);
}

// 设置已连接套接字为非阻塞,并且加入到 epoll 实例监测中
s = make_socket_non_blocking (infd);
if (s == -1)
abort ();

event.data.fd = infd;
// ET 模式
event.events = EPOLLIN | EPOLLET;
s = epoll_ctl (efd, EPOLL_CTL_ADD, infd, &event);
if (s == -1)
{
perror ("epoll_ctl");
abort ();
}
}
continue;
}
else
{
// 已连接套接字可读,我们读取该套接字所有的数据并打印出来
// 由于使用了 ET 模式,我们必须将所有可读数据读取完毕
int done = 0;

while (1)
{
ssize_t count;
char buf[512];

count = read (events[i].data.fd, buf, sizeof buf);
if (count == -1)
{
// 如果 errno == EAGAIN,说明所有数据已读取完毕
// 如果 errno != EAGAIN,说明读取出错
if (errno != EAGAIN)
{
// 读取出错
perror ("read");
done = 1;
}
break;
}
else if (count == 0)
{
// 客户端断开了连接
done = 1;
break;
}

// 打印到标准输出
s = write (1, buf, count);
if (s == -1)
{
perror ("write");
abort ();
}
}

if (done)
{
printf ("Closed connection on descriptor %d\n",
events[i].data.fd);

// 关闭连接
close (events[i].data.fd);
}
}
}
}

free (events);

close (sfd);

return EXIT_SUCCESS;
}

main函数首先调用create_and_bind创建并绑定监听套接字,接下来调用make_socket_non_blocking设置监听套接字为非阻塞模式,并调用listen系统调用监听客户端的连接请求。
接下来,我们创建了一个 epoll 实例,并将监听套接字加入到该 epoll 实例的 interest list,当监听套接字可读时,说明有新的客户端请求连接。
在主循环中,我们调用epoll_wait等待就绪事件的发生。timeout参数设置为-1说明主线程会一直阻塞到事件就绪。这些就绪事件包括以下类型:

  • 客户端请求到达:当监听套接字可读时,说明一个或者多个客户端连接请求到达,我们设置新的已连接套接字为非阻塞模式并添加到 epoll 实例的 interest list 中。
  • 客户端数据可读:已连接套接字就绪时,说明客户端数据可读。我们使用read每次读出512字节的数据,直接所有的数据读取完毕。这是由于我们使用了 ET 模式,ET 模式对于数据可读只会通知一次。读出的数据通过write系统调用打印到标准输出。

完整的程序可以在这里下载:epoll_example.c

参考资料

  1. https://banu.com/blog/2/how-to-use-epoll-a-complete-example-in-c/
  2. Linux/UNIX 系统编程手册,Michael Kerrisk 著,郭光伟译,人民邮电出版社
  3. UNIX网络编程,卷1:套接字联网API,第3版,人民邮电出版社
  4. http://blog.lucode.net/linux/epoll-tutorial.html