IO 多路复用之 poll

继续整理 poll。

前言

在上一篇文章中,我们已经学会了如何使用 select 这种 IO 多路复用方式,并且还知道了它的优缺点,其中最明显的就是它的可用文件描述符数量被系统限制到 1024 了。尽管我们可以修改这个值,但是这可能又会带来其他的问题。
那么为了更好的解决(避免麻烦)这个问题,Linux 又引入了新的 IO 多路复用方式——poll。

❗注意:poll 是 Linux 系统独有的 IO 多路复用方式。

预备知识

先了解一下与poll相关的内容。

pollfd

pollfd是一个结构体,它将文件描述符和事件组合在了一起,大概定义如下:

1
2
3
4
5
6
7
// 头文件
#include <poll.h>
struct pollfd {
int fd; /* file descriptor */
short events; /* requested events */
short revents; /* returned events */
};

其中fd表示文件描述符;events表示委托内核检测的事件类型(读、写或错误等);revents表示内核返回的事件。eventsrevents本质上是一种位掩码,所以会有一些(与事件类型对应的)常量用作与之位运算:

常量 events传入 revnets返回 说明
POLLIN 普通或优先带数据可读
POLLNORM 普通数据可读
POLLRDBAND 优先级带数据可读
POLLPRI 高优先级数据可读
POLLOUT 普通或优先级带数据可写
POLLWRNORM 普通数据可写
POLLWRBAND 优先级带数据可写
POLLERR × 发生错误
POLLHUP × 发生挂起
POLLNVAL × 描述不是打开的文件

nfds_t

nfds_t通常等同于unsigned long intsize_t,用来描述pollfd数组中需要检查的元素个数。

poll

现在我们来了解poll函数的相关内容。前面我们已经提到了,使用poll的目的之一是为了解决select的文件描述符数量限制问题,但对于poll来讲,这个限制其实还是存在的,因为系统的硬件性能不是无穷大的。
回到poll的工作机制和使用方法上来。poll的工作机制与select类似,也需要委托内核检测文件描述符状态,只是保存文件描述符的结构由fd_set换成了pollfd数组。同样的,在poll的整个工作过程中,依然需要将代表文件描述符的数据拷贝到内核中委托内核进行检测,所以这部分拷贝的造成的额外性能开销是依然存在的。尽管在使用新的数据结构pollfd后,poll的使用方法和select略有不同,但本质差异不大。

对应的函数原型:

1
2
3
// 头文件
#include <poll.h>
int poll(struct pollfd *fds, nfds_t nfds, int timeout);

参数含义:
fds:输入传入需要检测的pollfd集合,输出返回检测完的pollfd集合。
nfdsfds中需要检测的元素个数,其实就是遍历结构体数组时的右边界下标。
timeout:指定poll在没有任何事件发生时应该阻塞的最大毫秒数,0表示立即返回,不阻塞;-1,表示无限期阻塞,直到至少一个被监视的文件描述符发生事件;>0表示最多等待timeout毫秒,超时后立即返回,即使没有事件发生。

返回值:返回值大于0时,表示已就绪的文件描述符总数,即revents字段被设置为非零值的pollfd元素数量;等于0时,表示在指定timeout时间结束前没有发生任何时间;等于-1,表示调用出错,并设置 errno。

fds是一个传入传出参数

server

好了,与poll相关的内容基本介绍完毕,现在考虑如何使用它。

ver1

先回顾一下使用select的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
#include <stdio.h>
#include <ctype.h>
#include <string.h>

#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/select.h>

int main() {
// 创建 socket
int fd = socket(AF_INET, SOCK_STREAM, 0);
if(fd == -1) {
perror("socket error");
return -1;
}

// 绑定服务器 ip 和 port
struct sockaddr_in saddr;
saddr.sin_family = AF_INET;
saddr.sin_port = htons(2048);
saddr.sin_addr.s_addr = INADDR_ANY;
int ret = bind(fd, (struct sockaddr*)&saddr, sizeof(saddr));
if(ret == -1) {
perror("bind error");
return -1;
}

// 监听客户端请求
ret = listen(fd, 128);
if(ret == -1) {
perror("listen error");
return -1;
}

// 创建用于读信号的文件描述符集合
fd_set rdset;
FD_ZERO(&rdset);
// 添加用于监听的文件描述符
FD_SET(fd, &rdset);
int maxfd = fd;
while(1) {
fd_set tmpset = rdset;
// select 会修改传入的 fd_set 所以传入的是临时 fd_set
int ret = select(maxfd + 1, &tmpset, NULL, NULL, NULL);
// 判断用于监听的文件描述符是否有读信号
if(FD_ISSET(fd, &tmpset)) {
// 不需要客户端信息时,省略 accept 函数的参数
int cfd = accept(fd, NULL, NULL);
// 将用于通信的文件描述符添加到 fd_set 中
FD_SET(cfd, &rdset);
// 重新判断 maxfd 的大小
maxfd = cfd > maxfd ? cfd : maxfd;
}
char buf[1024] = {0};
// 接收客户端消息
for(int i = 3; i <= maxfd; ++i) {
if(i != fd && FD_ISSET(i, &tmpset)) {
int len = recv(i, buf, sizeof(buf), 0);
if(len > 0) {
printf("从客户端接收:%s, 文件描述符:%d\n", buf, i);
printf("len = %d\n", len);
printf("buf[0] = %d\n", buf[0]);
// 向客户端发送消息
// 将从客户端接收到的小写字符转换为大写后发送给客户端
for(int k = 0; k < len; ++k) {
buf[k] = toupper(buf[k]);
}
printf("发送给客户端:%s, 文件描述符:%d\n", buf, i);
ret = send(i, buf, len, 0);
if(ret == -1) {
perror("send error");
break;
}
} else if(len == 0) {
printf("client closed\n");
FD_CLR(i, &rdset); // 清除断开连接的文件描述符
close(i); // 关闭断开的文件描述符
} else {
perror("recv error");
FD_CLR(i, &rdset); // 清除断开连接的文件描述符
close(i); // 关闭断开的文件描述符
}
}
}
}

close(fd);
return 0;
}

ver2

现在我们再使用poll替换select

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
#include <stdio.h>
#include <ctype.h>
#include <string.h>

#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <poll.h>

int main() {
// 创建 socket
int fd = socket(AF_INET, SOCK_STREAM, 0);
if(fd == -1) {
perror("socket error");
return -1;
}

// 绑定服务器 ip 和 port
struct sockaddr_in saddr;
saddr.sin_family = AF_INET;
saddr.sin_port = htons(2048);
saddr.sin_addr.s_addr = INADDR_ANY;
int ret = bind(fd, (struct sockaddr*)&saddr, sizeof(saddr));
if(ret == -1) {
perror("bind error");
return -1;
}

// 监听客户端请求
ret = listen(fd, 128);
if(ret == -1) {
perror("listen error");
return -1;
}

// 创建 pollfd 集合
struct pollfd fds[1024];
// 初始化
for(int i = 0; i < 1024; ++i) {
fds[i].fd = -1;
fds[i].events = POLLIN; // 默认检测读事件
}
// 放入用于监听的文件描述符
fds[0].fd = fd;
int fds_idx = 0;
while(1) {
ret = poll(fds, fds_idx + 1, -1);
if(ret == -1) {
perror("poll error");
return -1;
}

// 用于监听的文件描述符有 POLLIN 事件
if(fds[0].revents & POLLIN) {
// 获得用于通信的文件描述符
int cfd = accept(fds[0].fd, NULL, NULL);
// 添加到 pollfd 数组中
for(int i = 0; i < 1024; ++i) {
if(fds[i].fd == -1) {
fds[i].fd = cfd;
fds_idx = i > fds_idx ? i : fds_idx;
break;
}
}
}
// 检测用于通信的文件描述符是否有事件
for(int i = 1; i <= fds_idx; ++i) {
// 加入数组的 pollfd 检测到 POLLIN 事件
if(fds[i].fd != -1 && fds[i].revents & POLLIN) {
char buf[1024];
// 接收客户端消息
int len = recv(fds[i].fd, buf, sizeof(buf), 0);
if(len > 0) {
printf("从客户端接收:%s, 文件描述符:%d\n", buf, fds[i].fd);
printf("len = %d\n", len);
printf("buf[0] = %d\n", buf[0]);
// 将从客户端接收到的小写字符转换为大写后发送给客户端
for(int k = 0; k < len; ++k) {
buf[k] = toupper(buf[k]);
}
printf("发送给客户端:%s, 文件描述符:%d\n", buf, fds[i].fd);
int ret = send(fds[i].fd, buf, len, 0);
if(ret == -1) {
perror("send error");
close(fds[i].fd);
fds[i].fd = -1;
break;
}
} else if(len == 0) {
printf("client closed\n");
close(fds[i].fd);
fds[i].fd = -1;
} else {
perror("recv error");
close(fds[i].fd);
fds[i].fd = -1;
}

}
}
}

close(fd);
return 0;
}

在上述代码中,我们使用poll替换掉了select来处理并发请求,可以看出poll的使用方式与select十分类似,但也需要注意一些细节:

  1. 创建的pollfd数组的大小是 1024,这意味着这里调用poll处理的文件描述符个数是 1024 个,所以这段代码的连接数上限与select是一致的。
  2. 用于监听的文件描述符被我们放在了数组下标为 0 的位置,所以后续的遍历可以跳过这个元素。
  3. 就遍历文件描述符的方式而言,pollselect是一致的,都是线性时间复杂度$O(n)$,而且遍历过程中很可能会遍历到无效元素。

另外,在前面的文章中,我们尝试了多线程和select组合在一起的方式,这里就不再尝试组合poll和多线程了,原因主要有两点:

  1. 本来 IO 多路复用的目的就是为了单线程处理并发问题,再组合多线程就显的有点多余了(不过可能在某些业务场景下会用到)。
  2. poll的使用场景并不多,因为还有更好的替代方案——epoll

summary

最后,我们总结一下。

poll的基本使用方法大概就是这样,与select是十分相似的。不过还有一个问题没有解决,那就是前面提到了,poll的出现解决了select文件描述符上限的问题,但在 ver2 这个版本的代码中并没有体现出来,下面我们分析一下。

首先是select对于select来说,其 1024 个文件描述符的限制是由fd_set决定的(因为在使用select函数时,我们只能传入这个数据结构委托内核帮助我们判断文件描述符的状态)。
在 POSIX 标准中定义了一个名为FD_SETSIZE的宏,这个宏决定了fd_set的大小。在大部分系统中,FD_SETSIZE都是 1024,所以fd_set的大小一般也是 1024。如果我们想要改动fd_set的大小,以让select支持更多连接,就必须要修改FD_SETSIZE这个宏,这就意味着改完后需要重新编译内核,这是比较麻烦的事情。

再回到poll上,poll所使用的数据结构是pollfd,传给poll的是pollfd的结构体数组,对应的数组大小是由用户自定义的,这也就意味着用户需要定义多大就定义多大!但也要注意三个方面的问题:

  1. 这个数组不能定义在栈上,因为操作系统为每个进程分配的栈空间大小是固定的,只有通过堆动态申请的内存才能尽可能大。在 ver2 版本的代码中,定义的pollfd数组是在栈上的,所以空间只定义为 1024。
  2. 这个数组的大小也与操作系统位数有关,因为 32 位的操作系统的内存空间最大只能到 4G,而 64 位操作系统的内存空间最大是 16EB。
  3. 在解决完内存的问题后,我们还需要让操作系统放开对单进程文件描述符的限制,这可以通过一些命令行命令或系统调用来完成。

现在我们在考虑一下pollselect的共同点和差异点。

🙋‍♂️共同点

  1. 用途:二者都是 IO 多路复用方式,用来高效监控多个文件描述符的状态变化。
  2. 阻塞模式:二者在阻塞模式下,都会轮询文件描述符集合,效率较低。
  3. 工作机制:二者都是在同步的处理并发请求,都是基于事件驱动的。
  4. 无源触发:如果一个描述符已经就绪(例如,有数据可读),只要该状态​​持续存在​​,后续调用select/poll都会立即返回并再次通知应用程序这个描述符是可读的,直到应用程序读取了所有可用数据导致其变为未就绪状态为止。
  5. 效率:都是线性时间复杂度$O(n)$内轮询所有的文件描述符。在不同的场景下,二者效率可能会有差异。
  6. 内核拷贝开销:不管是fd_set,还是pollfd数组,内核都会直接拷贝用户传入的参数。

🤦‍♂️差异点

差异点 select poll
数据结构 fd_set pollfd
文件描述符限制 一般为 1024 无要求
并发数量 小于 1024 理论上无限
事件类型 可读、可写和异常 更丰富的事件类型
可移植性 Windows/Linux 都支持 Linux 独有

好了,与poll相关的内容就到此为止了。


Buy me a coffee ? :)
0%