I/O多路复用之POLL系统调用

poll函数类似于select函数,也可以实现I/O多路复用。poll函数的声明如下:

1
2
#include <poll.h>
int poll(struct pollfd *fdarray, unsigned long nfds, int timeout);

第一个参数是指向一个结构数组第一个元素的指针。每个数组元素都是一个pollfd结构,用于指定测试某个给定描述符fd的条件。

1
2
3
4
5
struct pollfd {
int fd; // 需要测试的描述符
short event; // 对fd感兴趣的事件
short revents; // 发生在fd的事件:期待的事件或者异常情况发生
};

要测试的条件由events成员指定,poll函数在相应的revents成员中返回该描述符的状态。eventsrevents都由某个特定条件的一位或多位构成。下面表格列出了用于指定events标志以及测试revents标志的一些常值。

常值 是否作为events的输入 是否作为revents的结果 说明
POLLIN 普通或优先级带数据可读
POLLRDNORM 普通数据可读
POLLRDBAND 优先级带数据可读
POLLPRI 高优先级数据可读
POLLOUT 普通数据可写
POLLWRNORM 普通数据可写
POLLWRBAND 优先级带数据可写
POLLERR 发生错误
POLLHUP 发生挂起
POLLNVAL 描述符不是一个打开的文件

上表可以分为三个部分,第一部分是处理输入的4个常值,第二部分是处理输出的3个常值,第三部分是处理错误的3个常值。其中第三部分的3个常值不能在events中设置,只能在revents中返回。
poll识别三类数据:普通(normal),优先级带(priority band),高优先级(high priority)。

nfds参数指定被监听集合fdarray的大小。

timeout参数指定poll函数返回前等待多长的时间,其可能的取值如下表所示:

timeout值 说明
INFTIM 永远等待
0 立即返回,不阻塞进程
大于0 等待指定数目的毫秒数

当发生错误时,poll函数的返回值为-1,若经历了timeout时间后仍没有任何描述符就绪,则返回0,否则返回就绪描述符的个数,即revents成员值非0的描述符个数。

如果我们不再关心某个特定描述符,那么可以把与它对应的pollfd结构的fd成员设置成一个负值,poll函数将忽略这样的pollfd结构。

我们使用poll函数来实现一个echo服务器,其代码如下所示。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
#include <sys/types.h>
#include <sys/socket.h>
#include <stdio.h>
#include <netinet/in.h>
#include <unistd.h>
#include <errno.h>
#include <poll.h>

int main() {
int i, maxi, listenfd, connfd, sockfd;
int nready;
ssize_t n;
const int MAXLINE = 1024;
char buf[MAXLINE];
int server_len, client_len;
struct sockaddr_in server_address;
struct sockaddr_in client_address;
const int OPEN_MAX = 256;
struct pollfd client[OPEN_MAX];

// 创建套接字
listenfd = socket(AF_INET, SOCK_STREAM, 0);

// 命名套接字
server_address.sin_family = AF_INET;
server_address.sin_addr.s_addr = htonl(INADDR_ANY);
server_address.sin_port = htons(6240);
server_len = sizeof(server_address);
bind(listenfd, (struct sockaddr*)&server_address, server_len);

// 创建套接字队列
listen(listenfd, 5);

client[0].fd = listenfd;
client[0].events = POLLRDNORM;
for (i = 1; i < OPEN_MAX; ++i) {
client[i].fd = -1;
}
maxi = 0; // 当前client数组正在使用的最大下标值

for ( ; ; ) {
nready = poll(client, maxi + 1, -1);
printf("poll ready, num: %d\n", nready);

// 处理新的客户连接的情况
if (client[0].revents & POLLRDNORM) {
client_len = sizeof(client_address);
connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_len);
printf("new client, fd: %d\n", connfd);
for (i = 1; i < OPEN_MAX; ++i) {
if (client[i].fd < 0) {
client[i].fd = connfd; // 保存新的客户的已连接套接字
break;
}
}
if (i == OPEN_MAX) {
perror("too many clients");
}

client[i].events = POLLRDNORM;
if (i > maxi) {
maxi = i;
}

if (--nready <= 0) {
continue;
}
}

// 检查所有已连接套接字的可读情况
for (i = 1; i <= maxi; ++i) {
if ((sockfd = client[i].fd) < 0) {
continue;
}

if (client[i].revents & (POLLRDNORM | POLLERR)) {
if ((n = read(sockfd, buf, MAXLINE)) < 0) { // 读出现异常
if (errno == ECONNRESET) {
close(sockfd);
client[i].fd = -1;
} else {
perror("read error");
}
} else if (n == 0) { // 客户关闭连接
printf("client close, fd: %d\n", sockfd);
close(sockfd);
client[i].fd = -1;
} else { // 正常读取客户数据
printf("receive client data, fd: %d, data len: %d\n", sockfd, n);
write(sockfd, buf, n);
}

if (--nready <= 0) { // 已处理完成所有的就绪事件
break;
}
}
}
}
}

程序解释如下:
(1)我们声明在pollfd结构数组中存储OPEN_MAX个元素,在上面的例子中,我们声明为256个,即进程能够打开的最大描述符数目为256个。
(2)我们把client数组的第一项用于监听套接字,其余各项用于已连接套接字(当有新的客户连接时)。maxi用于标识client数组当前正在使用的最大下标值。
(3)我们调用poll以等待新的连接或者现有连接上有数据可读。当一个新的连接被接受后,我们在client数组中查找第一个描述符成员为负的可用项。找到一个可用项后,我们把新连接的描述符保存到其中,并设置POLLRDNORM事件。
(4)在检查某个现有连接上的数据可读时,我们调用read,并根据read的返回值来做不同的处理。如果是出错或者客户断开连接,那么我们就把客户相应的fd成员设置为-1;如果是客户数据可读,那么我们返回相同的数据给客户端。

运行上面的服务器代码,然后执行客户测试代码:

$ ./client & ./client & ./client

服务器代码输出:

poll ready, num: 1
new client, fd: 4
poll ready, num: 2
new client, fd: 5
receive client data, fd: 4, data len: 1
poll ready, num: 2
client close, fd: 4
receive client data, fd: 5, data len: 1
poll ready, num: 1
new client, fd: 4
poll ready, num: 2
receive client data, fd: 4, data len: 1
client close, fd: 5
poll ready, num: 1
client close, fd: 4

参考资料

UNIX 网络编程卷1:套接字联网API(第三版), W.Richard Stevens 等著

附:客户测试程序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
/*  Make the necessary includes and set up the variables.  */
#include <sys/types.h>
#include <sys/socket.h>
#include <stdio.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>
#include <stdlib.h>
int main()
{
int sockfd;
int len;
struct sockaddr_in address;
int result;
char ch = 'A';
/* Create a socket for the client. */
sockfd = socket(AF_INET, SOCK_STREAM, 0);
/* Name the socket, as agreed with the server. */
address.sin_family = AF_INET;
address.sin_addr.s_addr = inet_addr("127.0.0.1");
address.sin_port = htons(6240);
len = sizeof(address);
/* Now connect our socket to the server's socket. */
result = connect(sockfd, (struct sockaddr *)&address, len);
if(result == -1) {
perror("oops: client3");
exit(1);
}
/* We can now read/write via sockfd. */
write(sockfd, &ch, 1);
read(sockfd, &ch, 1);
printf("char from server = %c\n", ch);
close(sockfd);
exit(0);
}