IO 多路复用之 epoll

继续整理 epoll 的内容。

前言

在前两篇文章中,我们已经学会了如何使用selectpoll这两种 IO 多路复用方式,并且还比较了它们之间的相同点和差异点。由此我们知道了,poll解决了select的文件操作符数量限制问题。但由于pollselect一样都是以$O(n)$的时间复杂度来检测文件描述符集合,那么在高并发的情况下,poll的处理效率就会显著下降。为了解决这个问题,Linux 又引入了epoll这种 IO 多路复用的方式。

❗注意:epoll 是 Linux 系统独有的 IO 多路复用方式。

预备知识

在正式进入epoll的内容之前,还是先预热一下。

红黑树

首先我们需要简单了解一下红黑树,因为epoll底层就维护了一个红黑树。
红黑树(Red Black Tree)是一种自平衡的二叉查找树(BST 树),相比平衡二叉树(AVL 树)而言,它的左右子树高度差可能会大于 1,所以它不是严格意义上的平衡二叉树(只能算作变体)。同时,红黑树具备二叉查找树的特征,对于普通二叉查找树的算法,红黑树同样适用。换句话说,红黑树的查找、插入和删除操作的时间复杂度均为$O(logn)$。
相比于selectpoll的线性时间复杂,$O(logn)$的效率就很快了,特别是在大数据量的情况下,不过这并不是epoll能如此高效的秘诀。
实际上,除了红黑树外,epoll底层还维护了一个就绪列表,这是一个双向链表,并且保存了所有就绪事件。​​当某个被监控的文件描述符有事件发生时,内核会立即将其对应的epoll_event结构体加入到这个就绪列表中​​。
而调用epoll_wait函数时,内核就只需要检查这个链表,若非空,就拷贝到用户提供的数组中,对应的时间复杂度是$O(m)($m$是就绪事件数)。
对比selectpoll的$O(n)$,在百万连接的情况下,$O(m)$的时间复杂度就要小很多很多了。

epoll_data_t

epoll_data_t是一个联合体,它联合了四个数据成员,这四个数据成员共同使用一块大小为 8 字节的内存,大概定义如下:

1
2
3
4
5
6
7
8
// 头文件
#include <sys/epoll.h>
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;

每个成员的含义比较直观,没什么好解释的。

epoll_event

epoll_event是一个结构体,它组合了事件和epoll_data_t这两个成员,大概定义如下:

1
2
3
4
5
6
// 头文件
#include <sys/epoll.h>
struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};

poll类似,epollevents也是一种位掩码,所以也会又一些(与事件类型对应的)常量用作与之位运算:

常量 events传入 evnets返回 说明
EPOLLIN 表示监视可读事件;返回表示fd可读
EPOLLOUT 表示监视可写事件;返回表示fd可写
EPOLLRDHUP 表示监视对端关闭(或半关闭);返回表示对端关闭
EPOLLPRI 表示监视紧急数据可读;返回表示有紧急数据可读
EPOLLERR × 返回表示 fd 发生错误
EPOLLHUP × 返回表示 fd 被挂断(对端关闭连接)
EPOLLLET × 表示使用边缘触发模式,不返回
EPOLLONESHOT × 表示只监听一次事件,事件发生后需要重新添加,不返回
EPOLLWAKEUP × 用于防止系统休眠,不返回
EPOLLEXCLUSIVE × 用于避免多线程惊群,不返回

epoll

现在我们来了解epoll的相关内容。epollpollselect不同,它主要涉及三个 API 函数,分别是epoll_createepoll_ctlepoll_wait。同时,epoll还有两种工作模式:边沿触发(edge-triggered)和水平触发(level-triggered),默认使用水平触发模式。
下面我们逐一介绍epoll的 API 函数。

epoll_create

用途:创建一个epoll实例。
函数原型:

1
2
3
// 头文件
#include <sys/epoll.h>
int epoll_create(int size);

参数含义:
size:在 Linux 早期版本中,表示epoll实例预期监控的文件描述符数量,从 Linux 2.6.8 开始,这个参数会被忽略掉,但是需要大于 0。

返回值:成功时,返回新的epoll实例的文件描述符;失败返回-1,并设置 errno。

epoll_ctl

用途:管理epoll实例监控的文件描述符集(也就是那个红黑树),支持三种基本操作:添加、修改和删除对文件描述符的监控。

函数原型:

1
2
3
// 头文件
#include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

参数含义:
epfd:由epoll_create函数创建的epoll实例的文件描述符。
op:操作类型,共三种:添加(EPOLL_CTL_ADD)、修改(EPOLL_CTL_MOD)和删除(EPOLL_CTL_DEL)。
fd:需要被监控的文件描述符,注意不能与上述参数epfd相等,因为自己无法监控自己,但可以监控另一个epoll实例。
eventepoll_event类型的结构体指针,用于表示对应文件描述符需要检测的事件类型,epoll实例会拷贝这个指针指向的元素作为其内部新的结点。若操作类型为EPOLL_CTL_DEL,则这个值可以为NULL

返回值:调用成功返回0,失败返回-1,并设置 errno。

epoll_wait

用途:从epoll实例获取就绪事件。

函数原型:

1
2
3
4
// 头文件
#include <sys/epoll.h>
int epoll_wait(int epfd, struct epoll_event *events,
int maxevents, int timeout);

参数含义:
epfd:由epoll_create函数创建的epoll实例的文件描述符。
eventsepoll_event类型的结构体指针,是一个传出参数,用于存放就绪事件的结构体数组,所以一般是数组的首地址。
maxeventsevents数组所能接收的最大事件数量。
timeout:超时时间,-1表示无限阻塞,直到事件发生;0表示立即返回,不阻塞;>0表示等待指定毫秒数后返回。

返回值:调用成功时,返回就绪的文件描述符个数;超时返回0,表示无事件发生;-1表示调用失败,并设置 errno。

工作模式

在介绍完epoll相关 API 后,我们再额外说明一下epoll的两种工作模式。
首先是水平触发模式(Level-Triggered,LT),这是默认的工作模式。在这种模式下,只要文件描述符的状态就绪,操作系统内核就会产生事件并持续通知,如果用户一次性没有将数据处理完,那么下次仍会继续通知。

然后是边沿触发模式(Edge-Triggered,ET),这个模式需要用户手动设置,比如:

1
2
ev.events = EPOLLIN | EPOLLET;
epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);

在这种模式下,就绪事件只通知一次,所以用户必须一次性处理所有可用数据,同时还需要使用非阻塞式 IO。

server

现在我们考虑如何使用epoll

ver1

先回顾一下使用poll的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
#include <stdio.h>
#include <ctype.h>
#include <string.h>

#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <poll.h>

int main() {
// 创建 socket
int fd = socket(AF_INET, SOCK_STREAM, 0);
if(fd == -1) {
perror("socket error");
return -1;
}

// 绑定服务器 ip 和 port
struct sockaddr_in saddr;
saddr.sin_family = AF_INET;
saddr.sin_port = htons(2048);
saddr.sin_addr.s_addr = INADDR_ANY;
int ret = bind(fd, (struct sockaddr*)&saddr, sizeof(saddr));
if(ret == -1) {
perror("bind error");
return -1;
}

// 监听客户端请求
ret = listen(fd, 128);
if(ret == -1) {
perror("listen error");
return -1;
}

// 创建 pollfd 集合
struct pollfd fds[1024];
// 初始化
for(int i = 0; i < 1024; ++i) {
fds[i].fd = -1;
fds[i].events = POLLIN; // 默认检测读事件
}
// 放入用于监听的文件描述符
fds[0].fd = fd;
int fds_idx = 0;
while(1) {
ret = poll(fds, fds_idx + 1, -1);
if(ret == -1) {
perror("poll error");
return -1;
}
// 用于监听的文件描述符有 POLLIN 事件
if(fds[0].revents & POLLIN) {
// 获得用于通信的文件描述符
int cfd = accept(fds[0].fd, NULL, NULL);
// 添加到 pollfd 数组中
for(int i = 0; i < 1024; ++i) {
if(fds[i].fd == -1) {
fds[i].fd = cfd;
fds_idx = i > fds_idx ? i : fds_idx;
break;
}
}
}
// 检测用于通信的文件描述符是否有事件
for(int i = 1; i <= fds_idx; ++i) {
// 加入数组的 pollfd 检测到 POLLIN 事件
if(fds[i].fd != -1 && fds[i].revents & POLLIN) {
char buf[1024];
// 接收客户端消息
int len = recv(fds[i].fd, buf, sizeof(buf), 0);
if(len > 0) {
printf("从客户端接收:%s, 文件描述符:%d\n", buf, fds[i].fd);
printf("len = %d\n", len);
printf("buf[0] = %d\n", buf[0]);
// 将从客户端接收到的小写字符转换为大写后发送给客户端
for(int k = 0; k < len; ++k) {
buf[k] = toupper(buf[k]);
}
printf("发送给客户端:%s, 文件描述符:%d\n", buf, fds[i].fd);
int ret = send(fds[i].fd, buf, len, 0);
if(ret == -1) {
perror("send error");
close(fds[i].fd);
fds[i].fd = -1;
break;
}
} else if(len == 0) {
printf("client closed\n");
close(fds[i].fd);
fds[i].fd = -1;
} else {
perror("recv error");
close(fds[i].fd);
fds[i].fd = -1;
}
}
}
}

close(fd);
return 0;
}

ver2

再使用epoll替代poll:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
#include <stdio.h>
#include <ctype.h>
#include <string.h>

#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/epoll.h>

int main() {
// 创建 socket
int lfd = socket(AF_INET, SOCK_STREAM, 0);
if(lfd == -1) {
perror("socket error");
return -1;
}

// 绑定服务器 ip 和 port
struct sockaddr_in saddr;
saddr.sin_family = AF_INET;
saddr.sin_port = htons(2048);
saddr.sin_addr.s_addr = INADDR_ANY;
int ret = bind(lfd, (struct sockaddr*)&saddr, sizeof(saddr));
if(ret == -1) {
perror("bind error");
return -1;
}

// 监听客户端请求
ret = listen(lfd, 128);
if(ret == -1) {
perror("listen error");
return -1;
}

// 创建 epoll 实例
int epfd = epoll_create(1);
if(epfd == -1) {
perror("epoll_create error");
return -1;
}

// 创建 epoll_event 事件
struct epoll_event ev;
ev.events = EPOLLIN; // 默认 LT 模式
ev.data.fd = lfd;
// 将用于监听的描述符添加到 epoll 实例中
epoll_ctl(epfd, EPOLL_CTL_ADD, lfd, &ev);
// 创建 epoll_event 事件数组,用于接收返回的事件
struct epoll_event evs[1024];
int size = sizeof(evs) / sizeof(evs[0]);

while(1) {
int num = epoll_wait(epfd, evs, size, -1);
printf("num = %d\n", num);
if(num == -1) {
perror("epoll_wait");
return -1;
}
// 检查 epoll_wait 返回的事件
for(int i = 0; i < num; ++i) {
int fd = evs[i].data.fd;
// 如果是用于监听的文件描述符,说明有新连接
if(fd == lfd) {
int cfd = accept(fd, NULL, NULL);
ev.events = EPOLLIN;
ev.data.fd = cfd;
// 为 epoll 实例添加新结点
epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &ev);
} else {
// 接收客户端消息
char buf[1024];
int len = recv(fd, buf, sizeof(buf), 0);
if(len > 0) {
printf("从客户端接收:%s, 文件描述符:%d\n", buf, fd);
printf("len = %d\n", len);
printf("buf[0] = %d\n", buf[0]);
// 将从客户端接收到的小写字符转换为大写后发送给客户端
for(int k = 0; k < len; ++k) {
buf[k] = toupper(buf[k]);
}
printf("发送给客户端:%s, 文件描述符:%d\n", buf, fd);
int ret = send(fd, buf, len, 0);
if(ret == -1) {
perror("send error");
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
close(fd);
break;
}
} else if(len == 0) {
printf("client closed\n");
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
close(fd);
} else {
perror("recv error");
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
close(fd);
}
}
}
}

close(lfd);
return 0;
}

在上述代码中,我们使用epoll替换掉了poll来处理并发请求,且默认工作模式为LT。整体来讲,epollpoll的使用方式存在一些差异,其中:

  1. epoll使用三个单独的 API 函数来管理底层的红黑树;poll本身不提供对pollfd数组的 API,交给用户自己线性遍历。
  2. epoll使用evs数组接收返回的事件,这意味着epoll能处理的最大并发数量等于evs数组的大小(数组大小可变),但epoll能处理的总连接数是无限的;poll能处理的最大并发数量和连接数量都等于pollfd数组的大小。

还有一些细节:

  1. close(fd);之前,需要先从epoll实例中删除表示这个文件描述符的结点。
  2. 上述代码在测试过程中发现,分配给客户端的文件描述符是从5开始的,这是因为3被用于监听的 socket,4被用于epoll实例。但对操作系统来讲,文件描述符的分配取决于系统当前可用资源。
  3. 在 ver2 这部分代码中,我们默认epoll工作模式为LT,并设置buf数组大小为1024,如果此时客户端的消息长度大于1024,那么epoll会触发多次事件通知,直到缓冲区的消息被全部读完。按照这样的思路,在 ver2 的代码中,服务器会多次调用send函数向客户端发送消息。但由于 TCP 提供面向连接的字节流传输,没有消息边界的概念,所以这些小数据包可能会在 TCP 层被合并为一个连续流发送给客户端。换个角度思考,这实际上是削弱了用户对细节的把控能力,可能会产生意想不到的问题,应当避免。

ver2.1

现在再来尝试epollET工作模式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
#include <stdio.h>
#include <ctype.h>
#include <string.h>

#include <errno.h>
#include <fcntl.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <sys/epoll.h>

int main() {
// 创建 socket
int lfd = socket(AF_INET, SOCK_STREAM, 0);
if(lfd == -1) {
perror("socket error");
return -1;
}

// 绑定服务器 ip 和 port
struct sockaddr_in saddr;
saddr.sin_family = AF_INET;
saddr.sin_port = htons(2048);
saddr.sin_addr.s_addr = INADDR_ANY;
int ret = bind(lfd, (struct sockaddr*)&saddr, sizeof(saddr));
if(ret == -1) {
perror("bind error");
return -1;
}

// 监听客户端请求
ret = listen(lfd, 128);
if(ret == -1) {
perror("listen error");
return -1;
}

// 创建 epoll 实例
int epfd = epoll_create(1);
if(epfd == -1) {
perror("epoll_create error");
return -1;
}

// 创建 epoll_event 事件
struct epoll_event ev;
ev.events = EPOLLIN; // 默认 LT 模式
ev.data.fd = lfd;
// 将用于监听的描述符添加到 epoll 实例中
epoll_ctl(epfd, EPOLL_CTL_ADD, lfd, &ev);
// 创建 epoll_event 事件数组,用于接收返回的事件
struct epoll_event evs[1024];
int size = sizeof(evs) / sizeof(evs[0]);

while(1) {
int num = epoll_wait(epfd, evs, size, -1);
printf("num = %d\n", num);
if(num == -1) {
perror("epoll_wait");
return -1;
}
// 检查 epoll_wait 返回的事件
for(int i = 0; i < num; ++i) {
int fd = evs[i].data.fd;
// 如果是用于监听的文件描述符,说明有新连接
if(fd == lfd) {
int cfd = accept(fd, NULL, NULL);
// 使用 ET 模式
ev.events = EPOLLIN | EPOLLET;
// 设置文件描述符为非阻塞
int flag = fcntl(cfd, F_GETFL);
flag |= O_NONBLOCK;
fcntl(cfd, F_SETFL, flag);
ev.data.fd = cfd;
// 为 epoll 实例添加新结点
epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &ev);
} else {
// 接收客户端消息
while(1) {
char buf[5];
int len = recv(fd, buf, sizeof(buf), 0);
if(len > 0) {
printf("从客户端接收:%s, 文件描述符:%d\n", buf, fd);
printf("len = %d\n", len);
printf("buf[0] = %d\n", buf[0]);
// 将从客户端接收到的小写字符转换为大写后发送给客户端
for(int k = 0; k < len; ++k) {
buf[k] = toupper(buf[k]);
}
printf("发送给客户端:%s, 文件描述符:%d\n", buf, fd);
int ret = send(fd, buf, len, 0);
if(ret == -1) {
perror("send error");
// 为 epoll 实例删除结点
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
close(fd);
break;
}
} else if(len == 0) {
printf("client closed\n");
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
close(fd);
break;
} else {
if(errno == EAGAIN || errno == EWOULDBLOCK) {
perror("recv completed");
} else {
perror("recv error");
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
close(fd);
}
break;
}
}
}
}
}

close(lfd);
return 0;
}

仍然需要注意一些细节:

  1. 上述代码中,用于监听的文件描述符依然保持了LT模式,而用于通信的文件描述符则使用了ET模式。也就是说,epoll的工作模式是针对文件描述符而言的,可以具体细化到每个文件描述符。
  2. 使用ET模式后,需要将对应的文件描述符设置为非阻塞(Non-Blocking)模式,否则recv函数在接收完缓冲区的消息后,再次进入就会持续阻塞。
  3. 上述代码中,我们设置buf数组的大小是 5,若客户端发送的消息大于 5,则recv函数无法一次性读完,需要重复读取缓冲区。这就意味着,最后一次调用recv函数时,一定会产生错误,此时对应的errno就是EAGAIN,上述代码便是基于此错误码来判断何时退出循环。同时,由于recv函数的第三个参数,设置为buf的大小,这会导致在读取的字符个数正好与buf长度相等时,buf数组就没有\0字符了,而下面使用printf函数输出,就会存在问题了。不过这个问题,在这里影响不大(因为只是一条日志),只要确保服务器发送给客户端的消息是正常的即可。
  4. 上述代码同样会产生多次调用send函数向客户端发送消息的问题,实际环境中,应当避免此类问题。

ver2.2

在整理select相关内容的文章中,我们尝试了结合多线程一起使用,那epoll又该如何结合多线程呢?嗯,要做的事情与之前在select中所做的一致,我们使用主线程来管理客户端的连接,子线程负责处理 IO 事件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
#include <stdio.h>
#include <ctype.h>
#include <stdlib.h>

#include <arpa/inet.h>
#include <errno.h>
#include <fcntl.h>
#include <pthread.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <unistd.h>

typedef struct epollinfo {
int epfd;
int fd;
} EpollInfo;

void* working(void *arg) {
printf("子线程 %ld 开始\n", pthread_self());
EpollInfo *epinfo = (EpollInfo*)arg;
int fd = epinfo->fd;
int epfd = epinfo->epfd;
// 接收客户端消息
char s_buf[1024];
int s_idx = 0;
while(1) {
char buf[5];
int len = recv(fd, buf, sizeof(buf), 0);
if(len > 0) {
printf("从客户端接收:%s, 文件描述符:%d\n", buf, fd);
printf("len = %d\n", len);
// 将从客户端接收到的小写字符转换为大写字符
for(int k = 0; k < len; ++k) {
s_buf[s_idx++] = toupper(buf[k]);
}
} else if(len == 0) {
printf("client closed\n");
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
close(fd);
break;
} else {
if(errno == EAGAIN || errno == EWOULDBLOCK) {
printf("recv completed\n");
printf("发送给客户端:%s, 文件描述符:%d\n", s_buf, fd);
int ret = send(fd, s_buf, s_idx, 0);
// 发送失败
if(ret == -1) {
perror("send error");
// 为 epoll 实例删除结点
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
close(fd);
break;
}
} else {
perror("recv error");
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
close(fd);
}
break;
}
}
free(epinfo);
printf("子线程 %ld 结束\n", pthread_self());
return NULL;
}

int main() {
// 创建 socket
int lfd = socket(AF_INET, SOCK_STREAM, 0);
if(lfd == -1) {
perror("socket error");
return -1;
}

// 绑定服务器 ip 和 port
struct sockaddr_in saddr;
saddr.sin_family = AF_INET;
saddr.sin_port = htons(2048);
saddr.sin_addr.s_addr = INADDR_ANY;
int ret = bind(lfd, (struct sockaddr*)&saddr, sizeof(saddr));
if(ret == -1) {
perror("bind error");
return -1;
}

// 监听客户端请求
ret = listen(lfd, 128);
if(ret == -1) {
perror("listen error");
return -1;
}

// 创建 epoll 实例
int epfd = epoll_create(1);
if(epfd == -1) {
perror("epoll_create error");
return -1;
}

// 创建 epoll_event 事件
struct epoll_event ev;
ev.events = EPOLLIN; // 默认 LT 模式
ev.data.fd = lfd;
// 将用于监听的描述符添加到 epoll 实例中
epoll_ctl(epfd, EPOLL_CTL_ADD, lfd, &ev);
// 创建 epoll_event 事件数组,用于接收返回的事件
struct epoll_event evs[1024];
int size = sizeof(evs) / sizeof(evs[0]);

while(1) {
int num = epoll_wait(epfd, evs, size, -1);
printf("num = %d\n", num);
if(num == -1) {
perror("epoll_wait");
return -1;
}
// 检查 epoll_wait 返回的事件
for(int i = 0; i < num; ++i) {
int fd = evs[i].data.fd;
// 如果是用于监听的文件描述符,说明有新连接
if(fd == lfd) {
int cfd = accept(fd, NULL, NULL);
// 使用 ET 模式
ev.events = EPOLLIN | EPOLLET;
// 设置文件描述符为非阻塞
int flag = fcntl(cfd, F_GETFL);
flag |= O_NONBLOCK;
fcntl(cfd, F_SETFL, flag);
ev.data.fd = cfd;
// 为 epoll 实例添加新结点
epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &ev);
} else {
EpollInfo *epinfo = (EpollInfo*)malloc(sizeof(EpollInfo));
epinfo->epfd = epfd;
epinfo->fd = fd;
pthread_t tid;
pthread_create(&tid, NULL, working, epinfo);
pthread_detach(tid);
}
}
}

close(lfd);
return 0;
}

依然需要注意一些细节:

  1. epoll_ctlepoll_createepoll_wait都是线程安全的,因为在 Linux 中,与epoll相关的函数其内部实现会加锁(网上查到的资料)。
  2. 子线程中调用recvsend时,可能会引发线程安全问题。因为,不同子线程可能处理同一个文件描述符的 IO 事件。不过这个问题,在这段代码中问题并不大。
  3. 为了避免recv函数阻塞,同样需要将指定文件描述符设置为非阻塞模式。

summary

到这里,有关epoll的基本内容就结束了。在实际生产环境中,epoll会与其他网络模型结合在一起使用,用于构造大型的高并发应用。
最后我们简单比较一下selectpollepoll

特性 select poll epoll
维护的数据结构 fd_set位图 pollfd数组 红黑树 + 双向链表
文件描述符限制 一般为 1024 无限制
就绪事件获取时间复杂度 $O(n)$ $O(n)$ $O(m)$
可扩展性
可移植性 Windows/Linux Linux Linux

Buy me a coffee ? :)
0%