多线程并发服务器

在多进程并发服务器的应用程序中,父进程accept一个连接,fork一个子进程,该子进程负责处理与该连接对端的客户之间的通信。
尽管多进程的编程模型中,各进程拥有独立的地址空间,减少了出错的概率,然而,fork调用却存在一些问题:

  • fork是昂贵的,fork要把父进程的内存映像复制到子进程,并在子进程中复制所有描述符,这个操作是较重量级的。
  • fork返回之后父子进程之间信息的传递需要进程间通信(IPC)机制。

线程则可以解决上述两个问题。线程有时也称为轻量级的进程,线程的创建可能比进程的创建快10-100倍。同一个进程内所有线程共享相同的全局内存,这使得线程之间易于共享信息,但伴随这种简易性而来的是线程安全问题。

线程函数

1. pthread_create 函数

我们介绍的第一个线程的函数是pthread_create,它的作用是创建一个新线程。它的定义如下:

1
2
#include <pthread.h>
int pthread_create(pthread_t *thread, pthread_attr_t *attr, void *(*start_routine)(void *), void *arg);

这个函数的定义看起来很复杂,其实用起来很简单。
第一个参数是指向pthread_t类型的指针。线程被创建时,这个指针指向的变量将被写入一个标识符(线程ID)我们用该标识符来引用新线程。
第二个参数用于设置线程的属性,一般不需要特殊的属性,所以只需要设置该参数为NULL。
最后两个参数,分别告诉新线程将要启动执行的函数和传递给该函数的参数。
pthread_create函数在成功调用时返回0,如果失败则返回失败码。

2. pthread_exit 函数

线程通过调用pthread_exit函数终止执行。这个函数的作用是,终止调用它的线程并返回一个指向某个对象的指针。

1
2
#include <pthread.h>
void pthread_exit(void *retval);

3. pthread_join 函数

pthread_join函数的作用是等待某个线程的结束。其第一个参数指定了需要等待的线程ID,第二个参数是一个二级指针,它指向另一个指针,而后者指向线程的返回值。

1
2
#include <pthread.h>
int pthread_join(pthread_t th, void **thread_return);

4. pthread_self 函数

每个线程都有一个在所属进程内标识自己的ID。线程ID由phtread_create返回,而且我们已经看到pthread_join也使用了线程ID来指定等待哪个线程。pthread_self的作用是返回自身的线程ID。

1
2
#include <pthread.h>
pthread_t pthread_self(void);

5. pthread_detach 函数

一个线程或者是可汇合(joinable),或者是脱离的(detached)。当一个可汇合的线程终止时,它的线程ID和退出状态将留存到另一个线程对它调用pthread_join的返回值中。脱离的线程终止时,所有相关资源都被释放,我们不能等待它们终止。如果一个线程需要知道另一个线程什么时候终止,那就最好保持第二个线程的可汇合状态。
pthread_detach函数把指定的线程转变为脱离的状态。

1
2
#include <pthread.h>
int pthread_detach(pthread_t th);

第一个线程例子

我们的第一个线程的例子如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>

void *thread_function(void *arg);

char message[] = "Hello pthread!";

int main() {
int res;
pthread_t a_thread;
void *thread_result;

res = pthread_create(&a_thread, NULL, thread_function, (void*)message);
if (res != 0) {
perror("Thread creation failed.");
exit(EXIT_FAILURE);
}
printf("Waiting for thread to finish..\n");
res = pthread_join(a_thread, &thread_result);
if (res != 0) {
perror("Thread join failed.");
exit(EXIT_FAILURE);
}
printf("Thread joined, it returned %s\n", (char*)thread_result);
printf("Message is now %s\n", message);
exit(EXIT_SUCCESS);
}

void *thread_function(void *arg) {
printf("Thread_function is running. Argument was %s\n", (char*)arg);
sleep(3);
strcpy(message, "Bye!");
pthread_exit("Thank you for the CPU time.");
}

首先,我们调用pthread_create创建了一个新线程,在调用pthread_create函数时,我们向其传递了一个函数指针thread_function,即新线程的执行函数,以及传递给该执行函数的参数message
创建新线程后,主线程通过pthread_join等待新线程的执行完毕。而新线程执行thread_function函数,修改全局数据message,然后退出线程并向主线程返回一个字符串。主线程等待新线程执行完毕后,获得新线程的返回值和修改后的全局数组。

编译运行上面的程序,得到以下输出:

Waiting for thread to finish..
thread_function is running. Argument was Hello pthread!
Thread joined, it returned Thank you for the CPU time.
Message is now Bye!

基于线程的并发服务器

下面展示了基于线程的并发服务器的代码。整体结构类似于基于进程的设计。主线程不断地等待连接请求,然后创建一个新线程处理该请求。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
#include <sys/types.h>
#include <sys/socket.h>
#include <stdio.h>
#include <netinet/in.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>

void *thread_function(void *arg);

int main() {
int listenfd, *connfdp;
socklen_t server_len, client_len;
struct sockaddr_in server_address;
struct sockaddr_in client_address;
pthread_t th;

// 创建套接字
listenfd = socket(AF_INET, SOCK_STREAM, 0);

// 命名套接字
server_address.sin_family = AF_INET;
server_address.sin_addr.s_addr = htonl(INADDR_ANY);
server_address.sin_port = htons(6240);
server_len = sizeof(server_address);
bind(listenfd, (struct sockaddr*)&server_address, server_len);

// 创建套接字队列
listen(listenfd, 5);

// 接受客户的连接
while (1) {
printf("server waiting\n");
connfdp = malloc(sizeof(int));
client_len = sizeof(client_address);
*connfdp = accept(listenfd, (struct sockaddr*)&client_address, &client_len);
// 创建新线程
pthread_create(&th, NULL, thread_function, connfdp);
}
}

void *thread_function(void *arg) {
int connfd = *((int*)arg);
printf("Thread_function is running. Argument was %d\n", connfd);
pthread_detach(pthread_self());
free(arg);

// 处理客户的请求
char ch;
read(connfd, &ch, 1);
ch++;
write(connfd, &ch, 1);

close(connfd);
return NULL;
}

代码虽然较简单,但有几个地方值得我们重点关注一下。
第一个问题是我们在调用pthread_create时,如何将已连接套接字描述符传递给新线程。最容易想到的方法如下:

1
2
connfd = accept(listenfd, (struct sockaddr*)&client_address, &client_len);
pthread_create(&th, NULL, thread_function, &connfd);

然后,在线程函数中引用这个指针变量,并将其赋值给一个局部变量。

1
2
3
4
void *thread_function(void *arg) {
int connfd = *((int*)arg);
...
}

然而,这个做法可能会带来线程安全的问题。如果赋值语句在下一个accept之前完成,那么线程函数中的局部变量connfd将得到正确值。如果赋值语句在下一个accept之后才完成,那么线程函数中的局部变量connfd就会得到下一次连接的描述符的值。这显然不是我们想要的结果。
为了避免这种情况的出现,每次调用accept返回时,将返回的已连接套接字描述符存储在动态分配的内存中,这样无论线程函数中的赋值先于还是后于下一个accept完成,都不会出现线程安全的问题。

另一个问题是在线程函数中避免存储器资源泄漏。既然我们不显示式回收线程,我们就必须分离每个线程,使得它们在终止时存储器资源能够被回收。另外,还有一点需要提醒的,在线程函数中必须将主线程分配的动态内存释放了。

最后一个问题是如何关闭套接字描述符的问题。在基于进程的服务器中,我们在父进程和子进程两个位置都关闭了已连接套接字描述符。但在基于线程的服务器中,我们只需要在线程函数中关闭已连接套接字描述符,而不需要在主线程中关闭。
在Linux系统中,每个文件或者套接字都有一个引用计数,引用计数在文件表项中维护,它是当前打开着的引用该文件或者套接字的描述符的个数。对于多进程服务器的情形,已连接套接字描述符在父进程和子进程间共享(也就是被复制),因此已连接套接字相关联的文件表项的访问计数值为2,故在父进程和子进程都需要执行close操作。而对于多线程服务器的情形,由于线程间具有相同的地址空间,套接字描述符并不进程复制操作,即已连接套接字描述符的计数值为1,故只需要在创建的新线程中执行一次close操作即可。

参考资料

  1. 深入理解计算机系统,第2版,机械工业出版社
  2. Linux程序设计(第4版),Neil Matthew等著,人民邮电出版社,2010年
  3. UNIX 网络编程卷1:套接字联网API(第三版), W.Richard Stevens 等著
  4. http://www.tuicool.com/articles/fiEfaa