前言
在上一篇文章中,我们已经在 Linux 下基于 socket 相关 API 实现了简单的服务器和客户端程序,但我们发现服务器与客户端只能一对一。于是,我们利用多线程的编程思想,重构了服务器程序,使得服务器支持了一对多。最后我们还指出,仅仅是基于多线程,是不足以支撑高并发需求的,需要考虑其他的手段。所以,这篇文章就介绍通过 IO 多路复用之 select 的方式来处理并发需求。
select 支持跨平台,但不同操作系统提供的 API 使用起来可能会有差异。
预备知识
在正式开始select
之前,先要了解两种数据结构。
fd_set
首先是fd_set
,是一种文件描述符集合,大概定义如下:1
2
3typedef struct {
unsigned long fds_bits[__FD_SETSIZE / (8 * sizeof(unsigned long))];
} fd_set;
按照定义,fd_set
是一个unsigned long
型的数组,大小是通过计算得到的。实际上,我们不需要考虑这个数组的大小是多少,只需要知道fds_bits
总位数是 1024 即可,这是遵守 POSIX 标准的结果(其实也可以改动,但需要重新编译系统内核)。这也就意味着,fd_set
其实是一种位图(bitmap),且它总共有 1024 位。
对应的,Linux 也提供了四种操作宏用于操作fd_set
:1
2
3
4
5
6
7
8
9
10// 头文件
// 从 fd_set 中移除 fd
void FD_CLR(int fd, fd_set *set);
// 检查 fd 是否在集合中,返回非零值表示存在
int FD_ISSET(int fd, fd_set *set);
// 将文件描述符 fd 加入集合
void FD_SET(int fd, fd_set *set);
// 清空集合
void FD_ZERO(fd_set *set);
timeval
然后,timeval
是一种表示时间的数据结构,用于定义超时时间,大概定义如下:1
2
3
4
5
6// 头文件
struct timeval {
long tv_sec; /* seconds */
long tv_usec; /* microseconds */
};
select
现在我们来了解select
函数的相关内容,首先要知道的就是select
函数的功能。
select
函数可以将最大 1024 个文件描述符传递给内核,并由内核检测它们是否可读、可写或发生异常,从而避免了单文件描述符阻塞导致的程序停顿,实现 IO 多路复用。
这句话已经将select
的原理说的很清楚了,但还需要注意一些关键点:
select
是同步(而不是异步)处理并发的。select
函数将文件描述符传递给内核时,内核会拷贝传入的文件描述符,这里会带来额外的开销。
对应的函数原型:1
2
3
4// 头文件
int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout);
参数含义:nfds
:需要检查的最大文件描述符加 1。readfds
:输入传入需要监听读就绪的 fd 集合;输出返回实际可读的 fd 集合,NULL
表示不监听读事件。writefds
:输入传入需要监听写就绪的 fd 集合;输出返回实际可写的 fd 集合,NULL
表示不监听写事件。exceptfds
:输入传入需要监听异常的 fd 集合;输出返回发生异常的 fd 集合,NULL
表示不监听异常事件。timeout
:指定最长阻塞等待时间,NULL
表示无限阻塞,直到有事件发生;{0, 0}
表示非阻塞轮询,立即返回;{>0, >=0}
表示阻塞指定时间,超时这个值会被改为{0, 0}
,需要重新设置。
返回值:函数调用成功,返回就绪 fd 的总数量,包括所有集合中的就绪 fd;超时返回 0,表示无任何 fd 就绪;调用失败返回 -1,并设置 errno。
readfds
、writefds
、exceptfds
和timeout
这四个参数都是传入传出参数,这意味着使用者传入的参数会在函数内被修改,这也是这四个参数都是指针的原因。
server
介绍完select
之后,就可以开始准备将select
应用在我们之前的服务器程序中了。
ver1
先回顾一下之前的代码:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
int main() {
// 1. 创建 socket
int fd = socket(AF_INET, SOCK_STREAM, 0);
if(fd == -1) {
perror("socket error");
return -1;
}
// 2. 绑定服务器 ip 和 port
struct sockaddr_in saddr;
saddr.sin_family = AF_INET;
saddr.sin_port = htons(2048);
saddr.sin_addr.s_addr = INADDR_ANY;
int ret = bind(fd, (struct sockaddr*)&saddr, sizeof(saddr));
if(ret == -1) {
perror("bind error");
return -1;
}
// 3. 监听客户端请求
ret = listen(fd, 128);
if(ret == -1) {
perror("listen error");
return -1;
}
// 4. 与客户端建立连接
struct sockaddr_in caddr;
int addrlen = sizeof(caddr);
int cfd = accept(fd, (struct sockaddr*)&caddr, &addrlen);
if(cfd == -1) {
perror("accept error");
return -1;
}
char buf[1024];
while(1) {
memset(buf, 0, sizeof(buf));
// 6. 接收客户端消息
int len = recv(cfd, buf, sizeof(buf), 0);
if(len > 0) {
printf("从客户端接收:%s\n", buf);
printf("len = %d\n", len);
printf("buf[0] = %d\n", buf[0]);
// 7. 向客户端发送消息
// 将从客户端接收到的小写字符转换为大写后发送给客户端
for(int k = 0; k < len; ++k) {
buf[k] = toupper(buf[k]);
}
printf("发送给客户端:%s\n", buf);
send(cfd, buf, len, 0);
} else if(len == 0) {
printf("client closed");
break;
} else {
perror("recv error");
break;
}
}
close(fd);
close(cfd);
return 0;
}
嗯,这是个很十分简单的服务器程序。
ver2
按照前面对select
函数的解释,我们需要让它来帮我我们判断文件描述符是否发生改变。同时为了实现一对多的情况,需要轮询这些文件描述符的状态,当某些文件描述符的状态发生变化时,select
函数会返回对应的fd_set
值,按照这些值,我们可以找到对应的文件描述符并做相应的处理。
基本的思路就是这样,最后得到的代码:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
int main() {
// 创建 socket
int fd = socket(AF_INET, SOCK_STREAM, 0);
if(fd == -1) {
perror("socket error");
return -1;
}
// 绑定服务器 ip 和 port
struct sockaddr_in saddr;
saddr.sin_family = AF_INET;
saddr.sin_port = htons(2048);
saddr.sin_addr.s_addr = INADDR_ANY;
int ret = bind(fd, (struct sockaddr*)&saddr, sizeof(saddr));
if(ret == -1) {
perror("bind error");
return -1;
}
// 监听客户端请求
ret = listen(fd, 128);
if(ret == -1) {
perror("listen error");
return -1;
}
// 创建用于读信号的文件描述符集合
fd_set rdset;
FD_ZERO(&rdset);
// 添加用于监听的文件描述符
FD_SET(fd, &rdset);
int maxfd = fd;
while(1) {
fd_set tmpset = rdset;
// select 会修改传入的 fd_set 所以传入的是临时 fd_set
int ret = select(maxfd + 1, &tmpset, NULL, NULL, NULL);
// 判断用于监听的文件描述符是否有读信号
if(FD_ISSET(fd, &tmpset)) {
// 不需要客户端信息时,省略 accept 函数的参数
int cfd = accept(fd, NULL, NULL);
// 将用于通信的文件描述符添加到 fd_set 中
FD_SET(cfd, &rdset);
// 重新判断 maxfd 的大小
maxfd = cfd > maxfd ? cfd : maxfd;
}
char buf[1024] = {0};
// 接收客户端消息
for(int i = 3; i <= maxfd; ++i) {
if(i != fd && FD_ISSET(i, &tmpset)) {
int len = recv(i, buf, sizeof(buf), 0);
if(len > 0) {
printf("从客户端接收:%s, 文件描述符:%d\n", buf, i);
printf("len = %d\n", len);
printf("buf[0] = %d\n", buf[0]);
// 向客户端发送消息
// 将从客户端接收到的小写字符转换为大写后发送给客户端
for(int k = 0; k < len; ++k) {
buf[k] = toupper(buf[k]);
}
printf("发送给客户端:%s, 文件描述符:%d\n", buf, i);
ret = send(i, buf, len, 0);
if(ret == -1) {
perror("send error");
break;
}
} else if(len == 0) {
printf("client closed\n");
FD_CLR(i, &rdset); // 清除断开连接的文件描述符
close(i); // 关闭断开的文件描述符
} else {
perror("recv error");
FD_CLR(i, &rdset); // 清除断开连接的文件描述符
close(i); // 关闭断开的文件描述符
}
}
}
}
close(fd);
return 0;
}
在上述代码中,有一些细节需要注意:
- 在首次调用
select
函数之前,我们需要将用于监听的文件描述符添加到对应的fd_set
中,不然会导致select
持续阻塞,原因在于select
检测不到任何文件描述符的读信号。 - 在接收客户端消息时,文件描述符是从
3
开始遍历的,这么做是因为在 Linux 中,每一个程序都会默认打开三个文件描述符,分别是:0 - stdin
标准输入流,1 - stdout
标准输出流,2 - stderr
标准错误流。尽管我们并没有将这三个文件描述符放入集合中,但select
函数默认也不会对这三个进行处理。 - 当
recv
函数返回值len
小于或等于0
时,不要忘记关闭断开的文件描述符回收资源,不然这些文件描述符就无法重用了,这是一种资源泄露。
ver3
前面我们已经提到了select
只能同步处理并发请求,要想做到真正的异步,还需要借助多线程。现在我们考虑将接收客户端消息和向客户端发送消息的任务交给子线程来完成,这么做的原因是做起来会更加简单。
于是,我们可以得到下面的代码:1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
void* working(void *arg) {
int fd = *(int*)arg;
while(1) {
char buf[1024];
int len = recv(fd, buf, sizeof(buf), 0);
if(len > 0) {
printf("线程 %ld,从客户端接收:%s, 文件描述符:%d\n", pthread_self(), buf, fd);
printf("len = %d\n", len);
printf("buf[0] = %d\n", buf[0]);
// 向客户端发送消息
// 将从客户端接收到的小写字符转换为大写后发送给客户端
for(int i = 0; i < len; ++i) {
buf[i] = toupper(buf[i]);
}
printf("线程 %ld,发送给客户端:%s, 文件描述符:%d\n", pthread_self() ,buf, fd);
int ret = send(fd, buf, len, 0);
if(ret == -1) {
perror("send error");
}
} else if(len == 0) {
printf("client closed\n");
printf("关闭文件描述符:%d\n", fd);
close(fd); // 关闭断开的文件描述符
break;
} else {
perror("recv error");
printf("关闭文件描述符:%d\n", fd);
close(fd); // 关闭断开的文件描述符
break;
}
}
printf("子线程 %ld 退出\n", pthread_self());
return NULL;
}
int main() {
// 创建 socket
int fd = socket(AF_INET, SOCK_STREAM, 0);
if(fd == -1) {
perror("socket error");
return -1;
}
// 绑定服务器 ip 和 port
struct sockaddr_in saddr;
saddr.sin_family = AF_INET;
saddr.sin_port = htons(2048);
saddr.sin_addr.s_addr = INADDR_ANY;
int ret = bind(fd, (struct sockaddr*)&saddr, sizeof(saddr));
if(ret == -1) {
perror("bind error");
return -1;
}
// 监听客户端请求
ret = listen(fd, 128);
if(ret == -1) {
perror("listen error");
return -1;
}
// 创建用于读信号的文件描述符集合
fd_set rdset;
FD_ZERO(&rdset);
// 添加用于监听的文件描述符
FD_SET(fd, &rdset);
int maxfd = fd;
while(1) {
fd_set tmpset = rdset;
// select 会修改传入的 fd_set 所以传入的是临时 fd_set
int ret = select(maxfd + 1, &tmpset, NULL, NULL, NULL);
// 判断用于监听的文件描述符是否有读信号
if(FD_ISSET(fd, &tmpset)) {
// 不需要客户端信息时,省略 accept 函数的参数
int cfd = accept(fd, NULL, NULL);
// 将用于通信的文件描述符添加到 fd_set 中
FD_SET(cfd, &rdset);
// 重新判断 maxfd 的大小
maxfd = cfd > maxfd ? cfd : maxfd;
}
for(int i = 3; i <= maxfd; ++i) {
if(i != fd && FD_ISSET(i, &tmpset)) {
printf("主线程 %ld: i = %d\n", pthread_self(), i);
int tmpfd = i;
// 移除文件描述符集合中即将交给子线程的文件描述符
// 避免多个子线程重复使用一个文件描述符
FD_CLR(i, &rdset);
// 创建子线程与客户端进行通信
// 用于通信的文件描述符交给子线程,由子线程管理
pthread_t tid;
pthread_create(&tid, NULL, working, &tmpfd);
pthread_detach(tid);
}
}
}
close(fd);
return 0;
}
这样,我们就实现好了一个基于多线程和 select 的并发服务器了,其中有一些细节仍然需要注意:
select
最大只能支持 1024 个文件描述符,排除0
、1
和2
,那么最大并发数只有 1021 个。- 将与客户端通信的文件描述符交给子线程之前,我们先将其移出了主线程所使用的文件描述符集合中,目的是为了避免多个子线程产生竞态条件。如果我们希望每个子线程都可以与任意客户端进行通信,那就需要考虑线程同步的问题,此时
rdset
就是共享资源,必要时需要使用互斥锁来避免竞态条件。 - 主线程使用轮询的方式来检测所有的文件描述符状态,子线程使用轮询的方式从指定的文件描述符接收数据。在这种情况下,轮询会极大的占用 CPU 资源。但好在主线程的
select
函数被我们设置为阻塞函数了,子线程的recv
函数也是阻塞函数。阻塞时,主、子线程都会让出 CPU 资源。 - 传递给线程入口函数的
&tmpfd
是局部变量tmpfd
的地址,但由于我们在入口函数中做的是拷贝操作,所以这里也是线程安全的,各个线程拿到文件描述符可以确定是不同的。
最后,还需要说明的是,select 本身就是为了避免多线程的开销而产生的技术手段,那这里使用多线程加 select 的优点是什么呢?其实也很明显,相比单纯使用多线程实现的服务器而言,这里的 select 加多线程模型,一是避免了频繁创建子线程带来的开销,二是委托内核一次性检测所有的文件描述符状态显然比单个子线程频繁调用accept
函数的效率更高。
但这样做也有缺点,一个线程占用一个子线程也会导致线程占用率较高,利用率较低。实际上,具体的任务交给线程池可能会更好。
另外,在 ver1、ver2 和 ver3 的代码中也可能存在一些异常或边界情况没有考虑,这些都不是本文的重点,也就不再讨论了。
summary
最后,我们再总结一下本文的内容:
- 介绍了与 IO 多路复用之 select 的基本概念和使用方法。
- 实现了基于 select 的并发服务器。
- 实现了基于 select 和多线程的并发服务器。
同时,我们还可以发现 select 的一些优缺点:
🙋♂️优点:
- 跨平台,各大系统基本都支持。
- 简单易用好理解,适合学习和理解 IO 多路复用的基本概念。
- 基于事件驱动,避免了为每个连接创建线程的开销,可以以单线程处理并发请求。
🤦♂️缺点:
- 强依赖
fd_set
这种数据结构,导致文件描述符的数量限制为 1024,尽管我们可以通过一些手段修改,但也略显麻烦。 - 每次传递给内核文件描述符集合时,都会发送拷贝操作,这带来了额外的性能开销。
- 尽管使用位图来表示文件描述符十分方便和高效,但内核检测这些文件描述符的时间复杂度仍然是$O(n)$。
- 无法处理大量的并发请求。
- 阻塞模式下,
select
函数内部会轮询文件描述符集合,直到事件产生才停止,效率较低。