前言
在前两篇文章中,我们已经学会了如何使用select
和poll
这两种 IO 多路复用方式,并且还比较了它们之间的相同点和差异点。由此我们知道了,poll
解决了select
的文件操作符数量限制问题。但由于poll
与select
一样都是以$O(n)$的时间复杂度来检测文件描述符集合,那么在高并发的情况下,poll
的处理效率就会显著下降。为了解决这个问题,Linux 又引入了epoll
这种 IO 多路复用的方式。
❗注意:epoll 是 Linux 系统独有的 IO 多路复用方式。
预备知识
在正式进入epoll
的内容之前,还是先预热一下。
红黑树
首先我们需要简单了解一下红黑树,因为epoll
底层就维护了一个红黑树。
红黑树(Red Black Tree)是一种自平衡的二叉查找树(BST 树),相比平衡二叉树(AVL 树)而言,它的左右子树高度差可能会大于 1,所以它不是严格意义上的平衡二叉树(只能算作变体)。同时,红黑树具备二叉查找树的特征,对于普通二叉查找树的算法,红黑树同样适用。换句话说,红黑树的查找、插入和删除操作的时间复杂度均为$O(logn)$。
相比于select
和poll
的线性时间复杂,$O(logn)$的效率就很快了,特别是在大数据量的情况下,不过这并不是epoll
能如此高效的秘诀。
实际上,除了红黑树外,epoll
底层还维护了一个就绪列表,这是一个双向链表,并且保存了所有就绪事件。当某个被监控的文件描述符有事件发生时,内核会立即将其对应的epoll_event
结构体加入到这个就绪列表中。
而调用epoll_wait
函数时,内核就只需要检查这个链表,若非空,就拷贝到用户提供的数组中,对应的时间复杂度是$O(m)($m$是就绪事件数)。
对比select
和poll
的$O(n)$,在百万连接的情况下,$O(m)$的时间复杂度就要小很多很多了。
epoll_data_t
epoll_data_t
是一个联合体,它联合了四个数据成员,这四个数据成员共同使用一块大小为 8 字节的内存,大概定义如下:1
2
3
4
5
6
7
8// 头文件
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
每个成员的含义比较直观,没什么好解释的。
epoll_event
epoll_event
是一个结构体,它组合了事件和epoll_data_t
这两个成员,大概定义如下:1
2
3
4
5
6// 头文件
struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
与poll
类似,epoll
的events
也是一种位掩码,所以也会又一些(与事件类型对应的)常量用作与之位运算:
常量 | events 传入 |
evnets 返回 |
说明 |
---|---|---|---|
EPOLLIN | √ | √ | 表示监视可读事件;返回表示fd可读 |
EPOLLOUT | √ | √ | 表示监视可写事件;返回表示fd可写 |
EPOLLRDHUP | √ | √ | 表示监视对端关闭(或半关闭);返回表示对端关闭 |
EPOLLPRI | √ | √ | 表示监视紧急数据可读;返回表示有紧急数据可读 |
EPOLLERR | × | √ | 返回表示 fd 发生错误 |
EPOLLHUP | × | √ | 返回表示 fd 被挂断(对端关闭连接) |
EPOLLLET | √ | × | 表示使用边缘触发模式,不返回 |
EPOLLONESHOT | √ | × | 表示只监听一次事件,事件发生后需要重新添加,不返回 |
EPOLLWAKEUP | √ | × | 用于防止系统休眠,不返回 |
EPOLLEXCLUSIVE | √ | × | 用于避免多线程惊群,不返回 |
epoll
现在我们来了解epoll
的相关内容。epoll
与poll
、select
不同,它主要涉及三个 API 函数,分别是epoll_create
、epoll_ctl
和epoll_wait
。同时,epoll
还有两种工作模式:边沿触发(edge-triggered)和水平触发(level-triggered),默认使用水平触发模式。
下面我们逐一介绍epoll
的 API 函数。
epoll_create
用途:创建一个epoll
实例。
函数原型:1
2
3// 头文件
int epoll_create(int size);
参数含义:size
:在 Linux 早期版本中,表示epoll
实例预期监控的文件描述符数量,从 Linux 2.6.8 开始,这个参数会被忽略掉,但是需要大于 0。
返回值:成功时,返回新的epoll
实例的文件描述符;失败返回-1
,并设置 errno。
epoll_ctl
用途:管理epoll
实例监控的文件描述符集(也就是那个红黑树),支持三种基本操作:添加、修改和删除对文件描述符的监控。
函数原型:1
2
3// 头文件
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
参数含义:epfd
:由epoll_create
函数创建的epoll
实例的文件描述符。op
:操作类型,共三种:添加(EPOLL_CTL_ADD
)、修改(EPOLL_CTL_MOD
)和删除(EPOLL_CTL_DEL
)。fd
:需要被监控的文件描述符,注意不能与上述参数epfd
相等,因为自己无法监控自己,但可以监控另一个epoll
实例。event
:epoll_event
类型的结构体指针,用于表示对应文件描述符需要检测的事件类型,epoll
实例会拷贝这个指针指向的元素作为其内部新的结点。若操作类型为EPOLL_CTL_DEL
,则这个值可以为NULL
。
返回值:调用成功返回0
,失败返回-1
,并设置 errno。
epoll_wait
用途:从epoll
实例获取就绪事件。
函数原型:1
2
3
4// 头文件
int epoll_wait(int epfd, struct epoll_event *events,
int maxevents, int timeout);
参数含义:epfd
:由epoll_create
函数创建的epoll
实例的文件描述符。events
:epoll_event
类型的结构体指针,是一个传出参数,用于存放就绪事件的结构体数组,所以一般是数组的首地址。maxevents
:events
数组所能接收的最大事件数量。timeout
:超时时间,-1
表示无限阻塞,直到事件发生;0
表示立即返回,不阻塞;>0
表示等待指定毫秒数后返回。
返回值:调用成功时,返回就绪的文件描述符个数;超时返回0
,表示无事件发生;-1
表示调用失败,并设置 errno。
工作模式
在介绍完epoll
相关 API 后,我们再额外说明一下epoll
的两种工作模式。
首先是水平触发模式(Level-Triggered,LT),这是默认的工作模式。在这种模式下,只要文件描述符的状态就绪,操作系统内核就会产生事件并持续通知,如果用户一次性没有将数据处理完,那么下次仍会继续通知。
然后是边沿触发模式(Edge-Triggered,ET),这个模式需要用户手动设置,比如:1
2ev.events = EPOLLIN | EPOLLET;
epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ev);
在这种模式下,就绪事件只通知一次,所以用户必须一次性处理所有可用数据,同时还需要使用非阻塞式 IO。
server
现在我们考虑如何使用epoll
。
ver1
先回顾一下使用poll
的代码:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
int main() {
// 创建 socket
int fd = socket(AF_INET, SOCK_STREAM, 0);
if(fd == -1) {
perror("socket error");
return -1;
}
// 绑定服务器 ip 和 port
struct sockaddr_in saddr;
saddr.sin_family = AF_INET;
saddr.sin_port = htons(2048);
saddr.sin_addr.s_addr = INADDR_ANY;
int ret = bind(fd, (struct sockaddr*)&saddr, sizeof(saddr));
if(ret == -1) {
perror("bind error");
return -1;
}
// 监听客户端请求
ret = listen(fd, 128);
if(ret == -1) {
perror("listen error");
return -1;
}
// 创建 pollfd 集合
struct pollfd fds[1024];
// 初始化
for(int i = 0; i < 1024; ++i) {
fds[i].fd = -1;
fds[i].events = POLLIN; // 默认检测读事件
}
// 放入用于监听的文件描述符
fds[0].fd = fd;
int fds_idx = 0;
while(1) {
ret = poll(fds, fds_idx + 1, -1);
if(ret == -1) {
perror("poll error");
return -1;
}
// 用于监听的文件描述符有 POLLIN 事件
if(fds[0].revents & POLLIN) {
// 获得用于通信的文件描述符
int cfd = accept(fds[0].fd, NULL, NULL);
// 添加到 pollfd 数组中
for(int i = 0; i < 1024; ++i) {
if(fds[i].fd == -1) {
fds[i].fd = cfd;
fds_idx = i > fds_idx ? i : fds_idx;
break;
}
}
}
// 检测用于通信的文件描述符是否有事件
for(int i = 1; i <= fds_idx; ++i) {
// 加入数组的 pollfd 检测到 POLLIN 事件
if(fds[i].fd != -1 && fds[i].revents & POLLIN) {
char buf[1024];
// 接收客户端消息
int len = recv(fds[i].fd, buf, sizeof(buf), 0);
if(len > 0) {
printf("从客户端接收:%s, 文件描述符:%d\n", buf, fds[i].fd);
printf("len = %d\n", len);
printf("buf[0] = %d\n", buf[0]);
// 将从客户端接收到的小写字符转换为大写后发送给客户端
for(int k = 0; k < len; ++k) {
buf[k] = toupper(buf[k]);
}
printf("发送给客户端:%s, 文件描述符:%d\n", buf, fds[i].fd);
int ret = send(fds[i].fd, buf, len, 0);
if(ret == -1) {
perror("send error");
close(fds[i].fd);
fds[i].fd = -1;
break;
}
} else if(len == 0) {
printf("client closed\n");
close(fds[i].fd);
fds[i].fd = -1;
} else {
perror("recv error");
close(fds[i].fd);
fds[i].fd = -1;
}
}
}
}
close(fd);
return 0;
}
ver2
再使用epoll
替代poll
:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
int main() {
// 创建 socket
int lfd = socket(AF_INET, SOCK_STREAM, 0);
if(lfd == -1) {
perror("socket error");
return -1;
}
// 绑定服务器 ip 和 port
struct sockaddr_in saddr;
saddr.sin_family = AF_INET;
saddr.sin_port = htons(2048);
saddr.sin_addr.s_addr = INADDR_ANY;
int ret = bind(lfd, (struct sockaddr*)&saddr, sizeof(saddr));
if(ret == -1) {
perror("bind error");
return -1;
}
// 监听客户端请求
ret = listen(lfd, 128);
if(ret == -1) {
perror("listen error");
return -1;
}
// 创建 epoll 实例
int epfd = epoll_create(1);
if(epfd == -1) {
perror("epoll_create error");
return -1;
}
// 创建 epoll_event 事件
struct epoll_event ev;
ev.events = EPOLLIN; // 默认 LT 模式
ev.data.fd = lfd;
// 将用于监听的描述符添加到 epoll 实例中
epoll_ctl(epfd, EPOLL_CTL_ADD, lfd, &ev);
// 创建 epoll_event 事件数组,用于接收返回的事件
struct epoll_event evs[1024];
int size = sizeof(evs) / sizeof(evs[0]);
while(1) {
int num = epoll_wait(epfd, evs, size, -1);
printf("num = %d\n", num);
if(num == -1) {
perror("epoll_wait");
return -1;
}
// 检查 epoll_wait 返回的事件
for(int i = 0; i < num; ++i) {
int fd = evs[i].data.fd;
// 如果是用于监听的文件描述符,说明有新连接
if(fd == lfd) {
int cfd = accept(fd, NULL, NULL);
ev.events = EPOLLIN;
ev.data.fd = cfd;
// 为 epoll 实例添加新结点
epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &ev);
} else {
// 接收客户端消息
char buf[1024];
int len = recv(fd, buf, sizeof(buf), 0);
if(len > 0) {
printf("从客户端接收:%s, 文件描述符:%d\n", buf, fd);
printf("len = %d\n", len);
printf("buf[0] = %d\n", buf[0]);
// 将从客户端接收到的小写字符转换为大写后发送给客户端
for(int k = 0; k < len; ++k) {
buf[k] = toupper(buf[k]);
}
printf("发送给客户端:%s, 文件描述符:%d\n", buf, fd);
int ret = send(fd, buf, len, 0);
if(ret == -1) {
perror("send error");
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
close(fd);
break;
}
} else if(len == 0) {
printf("client closed\n");
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
close(fd);
} else {
perror("recv error");
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
close(fd);
}
}
}
}
close(lfd);
return 0;
}
在上述代码中,我们使用epoll
替换掉了poll
来处理并发请求,且默认工作模式为LT
。整体来讲,epoll
与poll
的使用方式存在一些差异,其中:
epoll
使用三个单独的 API 函数来管理底层的红黑树;poll
本身不提供对pollfd
数组的 API,交给用户自己线性遍历。epoll
使用evs
数组接收返回的事件,这意味着epoll
能处理的最大并发数量等于evs
数组的大小(数组大小可变),但epoll
能处理的总连接数是无限的;poll
能处理的最大并发数量和连接数量都等于pollfd
数组的大小。
还有一些细节:
- 在
close(fd);
之前,需要先从epoll
实例中删除表示这个文件描述符的结点。 - 上述代码在测试过程中发现,分配给客户端的文件描述符是从
5
开始的,这是因为3
被用于监听的 socket,4
被用于epoll
实例。但对操作系统来讲,文件描述符的分配取决于系统当前可用资源。 - 在 ver2 这部分代码中,我们默认
epoll
工作模式为LT
,并设置buf
数组大小为1024
,如果此时客户端的消息长度大于1024
,那么epoll
会触发多次事件通知,直到缓冲区的消息被全部读完。按照这样的思路,在 ver2 的代码中,服务器会多次调用send
函数向客户端发送消息。但由于 TCP 提供面向连接的字节流传输,没有消息边界的概念,所以这些小数据包可能会在 TCP 层被合并为一个连续流发送给客户端。换个角度思考,这实际上是削弱了用户对细节的把控能力,可能会产生意想不到的问题,应当避免。
ver2.1
现在再来尝试epoll
的ET
工作模式:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
int main() {
// 创建 socket
int lfd = socket(AF_INET, SOCK_STREAM, 0);
if(lfd == -1) {
perror("socket error");
return -1;
}
// 绑定服务器 ip 和 port
struct sockaddr_in saddr;
saddr.sin_family = AF_INET;
saddr.sin_port = htons(2048);
saddr.sin_addr.s_addr = INADDR_ANY;
int ret = bind(lfd, (struct sockaddr*)&saddr, sizeof(saddr));
if(ret == -1) {
perror("bind error");
return -1;
}
// 监听客户端请求
ret = listen(lfd, 128);
if(ret == -1) {
perror("listen error");
return -1;
}
// 创建 epoll 实例
int epfd = epoll_create(1);
if(epfd == -1) {
perror("epoll_create error");
return -1;
}
// 创建 epoll_event 事件
struct epoll_event ev;
ev.events = EPOLLIN; // 默认 LT 模式
ev.data.fd = lfd;
// 将用于监听的描述符添加到 epoll 实例中
epoll_ctl(epfd, EPOLL_CTL_ADD, lfd, &ev);
// 创建 epoll_event 事件数组,用于接收返回的事件
struct epoll_event evs[1024];
int size = sizeof(evs) / sizeof(evs[0]);
while(1) {
int num = epoll_wait(epfd, evs, size, -1);
printf("num = %d\n", num);
if(num == -1) {
perror("epoll_wait");
return -1;
}
// 检查 epoll_wait 返回的事件
for(int i = 0; i < num; ++i) {
int fd = evs[i].data.fd;
// 如果是用于监听的文件描述符,说明有新连接
if(fd == lfd) {
int cfd = accept(fd, NULL, NULL);
// 使用 ET 模式
ev.events = EPOLLIN | EPOLLET;
// 设置文件描述符为非阻塞
int flag = fcntl(cfd, F_GETFL);
flag |= O_NONBLOCK;
fcntl(cfd, F_SETFL, flag);
ev.data.fd = cfd;
// 为 epoll 实例添加新结点
epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &ev);
} else {
// 接收客户端消息
while(1) {
char buf[5];
int len = recv(fd, buf, sizeof(buf), 0);
if(len > 0) {
printf("从客户端接收:%s, 文件描述符:%d\n", buf, fd);
printf("len = %d\n", len);
printf("buf[0] = %d\n", buf[0]);
// 将从客户端接收到的小写字符转换为大写后发送给客户端
for(int k = 0; k < len; ++k) {
buf[k] = toupper(buf[k]);
}
printf("发送给客户端:%s, 文件描述符:%d\n", buf, fd);
int ret = send(fd, buf, len, 0);
if(ret == -1) {
perror("send error");
// 为 epoll 实例删除结点
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
close(fd);
break;
}
} else if(len == 0) {
printf("client closed\n");
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
close(fd);
break;
} else {
if(errno == EAGAIN || errno == EWOULDBLOCK) {
perror("recv completed");
} else {
perror("recv error");
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
close(fd);
}
break;
}
}
}
}
}
close(lfd);
return 0;
}
仍然需要注意一些细节:
- 上述代码中,用于监听的文件描述符依然保持了
LT
模式,而用于通信的文件描述符则使用了ET
模式。也就是说,epoll
的工作模式是针对文件描述符而言的,可以具体细化到每个文件描述符。 - 使用
ET
模式后,需要将对应的文件描述符设置为非阻塞(Non-Blocking)模式,否则recv
函数在接收完缓冲区的消息后,再次进入就会持续阻塞。 - 上述代码中,我们设置
buf
数组的大小是 5,若客户端发送的消息大于 5,则recv
函数无法一次性读完,需要重复读取缓冲区。这就意味着,最后一次调用recv
函数时,一定会产生错误,此时对应的errno
就是EAGAIN
,上述代码便是基于此错误码来判断何时退出循环。同时,由于recv
函数的第三个参数,设置为buf
的大小,这会导致在读取的字符个数正好与buf
长度相等时,buf
数组就没有\0
字符了,而下面使用printf
函数输出,就会存在问题了。不过这个问题,在这里影响不大(因为只是一条日志),只要确保服务器发送给客户端的消息是正常的即可。 - 上述代码同样会产生多次调用
send
函数向客户端发送消息的问题,实际环境中,应当避免此类问题。
ver2.2
在整理select
相关内容的文章中,我们尝试了结合多线程一起使用,那epoll
又该如何结合多线程呢?嗯,要做的事情与之前在select
中所做的一致,我们使用主线程来管理客户端的连接,子线程负责处理 IO 事件:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
typedef struct epollinfo {
int epfd;
int fd;
} EpollInfo;
void* working(void *arg) {
printf("子线程 %ld 开始\n", pthread_self());
EpollInfo *epinfo = (EpollInfo*)arg;
int fd = epinfo->fd;
int epfd = epinfo->epfd;
// 接收客户端消息
char s_buf[1024];
int s_idx = 0;
while(1) {
char buf[5];
int len = recv(fd, buf, sizeof(buf), 0);
if(len > 0) {
printf("从客户端接收:%s, 文件描述符:%d\n", buf, fd);
printf("len = %d\n", len);
// 将从客户端接收到的小写字符转换为大写字符
for(int k = 0; k < len; ++k) {
s_buf[s_idx++] = toupper(buf[k]);
}
} else if(len == 0) {
printf("client closed\n");
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
close(fd);
break;
} else {
if(errno == EAGAIN || errno == EWOULDBLOCK) {
printf("recv completed\n");
printf("发送给客户端:%s, 文件描述符:%d\n", s_buf, fd);
int ret = send(fd, s_buf, s_idx, 0);
// 发送失败
if(ret == -1) {
perror("send error");
// 为 epoll 实例删除结点
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
close(fd);
break;
}
} else {
perror("recv error");
epoll_ctl(epfd, EPOLL_CTL_DEL, fd, NULL);
close(fd);
}
break;
}
}
free(epinfo);
printf("子线程 %ld 结束\n", pthread_self());
return NULL;
}
int main() {
// 创建 socket
int lfd = socket(AF_INET, SOCK_STREAM, 0);
if(lfd == -1) {
perror("socket error");
return -1;
}
// 绑定服务器 ip 和 port
struct sockaddr_in saddr;
saddr.sin_family = AF_INET;
saddr.sin_port = htons(2048);
saddr.sin_addr.s_addr = INADDR_ANY;
int ret = bind(lfd, (struct sockaddr*)&saddr, sizeof(saddr));
if(ret == -1) {
perror("bind error");
return -1;
}
// 监听客户端请求
ret = listen(lfd, 128);
if(ret == -1) {
perror("listen error");
return -1;
}
// 创建 epoll 实例
int epfd = epoll_create(1);
if(epfd == -1) {
perror("epoll_create error");
return -1;
}
// 创建 epoll_event 事件
struct epoll_event ev;
ev.events = EPOLLIN; // 默认 LT 模式
ev.data.fd = lfd;
// 将用于监听的描述符添加到 epoll 实例中
epoll_ctl(epfd, EPOLL_CTL_ADD, lfd, &ev);
// 创建 epoll_event 事件数组,用于接收返回的事件
struct epoll_event evs[1024];
int size = sizeof(evs) / sizeof(evs[0]);
while(1) {
int num = epoll_wait(epfd, evs, size, -1);
printf("num = %d\n", num);
if(num == -1) {
perror("epoll_wait");
return -1;
}
// 检查 epoll_wait 返回的事件
for(int i = 0; i < num; ++i) {
int fd = evs[i].data.fd;
// 如果是用于监听的文件描述符,说明有新连接
if(fd == lfd) {
int cfd = accept(fd, NULL, NULL);
// 使用 ET 模式
ev.events = EPOLLIN | EPOLLET;
// 设置文件描述符为非阻塞
int flag = fcntl(cfd, F_GETFL);
flag |= O_NONBLOCK;
fcntl(cfd, F_SETFL, flag);
ev.data.fd = cfd;
// 为 epoll 实例添加新结点
epoll_ctl(epfd, EPOLL_CTL_ADD, cfd, &ev);
} else {
EpollInfo *epinfo = (EpollInfo*)malloc(sizeof(EpollInfo));
epinfo->epfd = epfd;
epinfo->fd = fd;
pthread_t tid;
pthread_create(&tid, NULL, working, epinfo);
pthread_detach(tid);
}
}
}
close(lfd);
return 0;
}
依然需要注意一些细节:
epoll_ctl
、epoll_create
和epoll_wait
都是线程安全的,因为在 Linux 中,与epoll
相关的函数其内部实现会加锁(网上查到的资料)。- 子线程中调用
recv
和send
时,可能会引发线程安全问题。因为,不同子线程可能处理同一个文件描述符的 IO 事件。不过这个问题,在这段代码中问题并不大。 - 为了避免
recv
函数阻塞,同样需要将指定文件描述符设置为非阻塞模式。
summary
到这里,有关epoll
的基本内容就结束了。在实际生产环境中,epoll
会与其他网络模型结合在一起使用,用于构造大型的高并发应用。
最后我们简单比较一下select
、poll
和epoll
:
特性 | select |
poll |
epoll |
---|---|---|---|
维护的数据结构 | fd_set 位图 |
pollfd 数组 |
红黑树 + 双向链表 |
文件描述符限制 | 一般为 1024 | 无限制 | |
就绪事件获取时间复杂度 | $O(n)$ | $O(n)$ | $O(m)$ |
可扩展性 | 差 | 中 | 优 |
可移植性 | Windows/Linux | Linux | Linux |