CSAPP 12 - 并发编程
如果逻辑控制流在时间上重叠,那么它们就是并发的。并发出现在计算机系统许多层面上,使用应用级并发的程序称为并发程序。
基于进程
构造并发程序最简单的方法就是用进程。一个构造并发服务器的自然方法是:在父进程中接受客户端的链接请求,然后创建一个新的子进程来为每个新客户端提供服务。
假设有两个客户端和一个服务器,服务器在监听一个监听描述符 (3) 上的连接请求。现在假设服务器接受了客户端 1 的连接请求,并返回一个已连接描述符 (4)。在接受连接请求后,服务器派生一个子进程,这个子进程获得服务器进程描述符表的完整副本。子进程关闭它的副本中的描述符 3,而父进程关闭它的已连接描述符 4 的副本。
父子进程的已连接描述符都指向一个文件表表项,所以父进程必须关闭它的已连接描述符的副本。否则,将永不会释放描述符 4 的文件表条目,这会引起内存泄漏。
现在,假设父进程为客户端 1 创建了子进程后,它接受一个新的客户端 2 的连接请求,并返回一个新的已连接文件描述符 (5),然后父进程又派生一个新的子进程,这个子进程用于为客户端 2 提供服务:
并发服务器
将前一章中的 echo 服务器修改为基于进程的并发服务器,有以下注意:
- 通常服务器都会运行很长时间,所以必须包括一个 SIGCHLD 处理程序,来回收僵死子进程的资源。因为 Linux 信号是不排队的,所以 SIGCHLD 处理程序必须准备好回收多个僵死子进程的资源。
- 父子进程必须关闭它们各自的 connfd,以避免内存泄漏。
- 因为套接字文件表表项中的引用技术,只有父子进程的 connfd 都关闭了,到客户端的连接才会终止。
#include "assist.h"
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/wait.h>
#include <sys/socket.h>
void sigchld_handler(int sig)
{
while (waitpid(-1, 0, WNOHANG) > 0)
;
}
int main(int argc, char **argv)
{
int listen_fd, client_fd;
socklen_t client_len;
struct sockaddr_storage client_addr;
char client_hostname[MAXLINE], client_port[MAXLINE];
if (argc != 2) {
fprintf(stderr, "usage: %s <port>\n", argv[0]);
exit(0);
}
signal(SIGCHLD, sigchld_handler);
listen_fd = open_listen_fd(argv[1]);
if (listen_fd == -1) {
fprintf(stderr, "cannot create a listen socket\n");
exit(1);
}
for (;;) {
client_len = sizeof(client_addr);
client_fd = accept(listen_fd, (SA)&client_addr, &client_len);
if (fork() == 0) {
close(listen_fd);
echo(client_fd);
close(client_fd);
exit(0);
}
fprintf(stderr, "new connection\n");
close(client_fd);
}
}
进程的优劣
对于在父子进程间共享状态,进程有一个非常清晰的模型。共享文件表,但是不共享用户地址空间。进程有独立的地址空间既是优点也是缺点。这样一来,一个进程不可能不小心覆盖另一个进程的虚拟内存,这是一个优点。
另一方面,独立的地址空间会使得进程共享状态信息变得更加困难,为了共享信息,它们必须使用显示的 IPC 机制。基于进程的设计的另一个缺点是,它们往往比较慢,因为进程控制和 IPC 的开销很高。
I/O 多路复用
如果 echo 服务器也要能对用户从标准输入键入的命令作出响应,那么服务器必须响应两个独立的 I/O 事件,问题是无法确定先等待哪个事件。一个解决方案是使用 I/O 多路复用技术。基本思想就是使用 select 函数,要求内核挂起进程,只有在一个或多个 I/O 事件发生后,才将控制返回给进程。
select 函数比较复杂,有许多不同的使用场景,这里只讨论第一种场景:等待一组描述符准备好读。
int select(int n, fd_set *fdset, NULL, NULL, NULL);
FD_ZERO(fd_set *fdset);
FD_CLR(int fd, fd_set *fdset);
FD_SET(int fd, fd_set *fdset);
FD_ISSET(int fd, fd_set *fdset);
select 函数处理类型为 fd_set 的集合,也叫做描述符集合,逻辑上,可以将描述符集合看成一个大小为 n 的位向量:bn-1 … b1 b0。
其中,每个位 bk 对应描述符 k。当且仅当 bk = 1,描述符 k 才表明是集合的一个元素。对 fd_set 只能有三种操作:分配,复制,使用上述宏。
select 函数的参数中,第二个参数就是读描述符集合。第一个参数是描述符集合的基数 (任何描述符集合的最大基数) 其实就是要监听的最大描述符 n + 1。调用 select 函数会一直阻塞,直到读集合中至少有一个描述符准备好可以读。select 函数会修改参数 fdset,修改后 fdset 就是读集合的一个子集,称为准备好集合。
一个简单的例子:
#include "assist.h"
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/socket.h>
void command();
int main(int argc, char **argv)
{
int listen_fd, client_fd;
socklen_t client_len;
struct sockaddr_storage client_addr;
char client_hostname[MAXLINE], client_port[MAXLINE];
fd_set read_set, ready_set;
if (argc != 2) {
fprintf(stderr, "usage: %s <port>\n", argv[0]);
exit(0);
}
listen_fd = open_listen_fd(argv[1]);
if (listen_fd == -1) {
fprintf(stderr, "cannot create a listen socket\n");
exit(1);
}
FD_ZERO(&read_set);
FD_SET(STDIN_FILENO, &read_set);
FD_SET(listen_fd, &read_set);
for (;;) {
ready_set = read_set;
select(listen_fd + 1, &ready_set, NULL, NULL, NULL);
if (FD_ISSET(STDIN_FILENO, &ready_set))
command();
if (FD_ISSET(listen_fd, &ready_set)) {
fprintf(stderr, "new connection\n");
client_len = sizeof(client_addr);
client_fd = accept(listen_fd, (SA *)&client_addr, &client_len);
echo(client_fd);
}
close(client_fd);
}
}
void command()
{
char buf[MAXLINE];
if (!fgets(buf, MAXLINE, stdin))
exit(0);
fprintf(stderr, "from console: %s", buf);
}
现在这个服务器可以同时等待两个 I/O 事件了,但是它还有很多缺点。
并发服务器
I/O 多路复用可以用作并发事件驱动服务器程序的基础,在事件驱动程序中,某些事件会导致流向前推进。一般的思想是将逻辑流模型化为状态机。状态机
一般指有限状态机 FSM 是一种数学模型,它包含四部分要素:状态,事件,动作,转移。
对于每个新的客户端 k,服务器会创建一个新的状态机 sk,并将它和已连接描述符 dk 联系起来。在任何时刻,状态机 sk 都处于某个确定的状态下,当事件发生时,可能会有一些动作被执行,并且状态机的状态可能被转移到另一个状态。
具体来说,服务器借助 select 函数来检测事件的发生,当每个已连接描述符准备好可读时,服务器就为相应的状态机执行转移:
typedef struct {
int maxi; /* high water index into client array */
int maxfd; /* largest fd in read_set */
int nready; /* number of ready fd from select */
fd_set read_set; /* set of all active fds */
fd_set ready_set; /* subset of fds ready for reading */
int clientfd[FD_SETSIZE]; /* set of active fds */
} pool;
extern void init_pool(int listen_fd, pool *pool);
extern void add_client(int fd, pool *pool);
extern void check_clients(pool *pool);
在本例中,服务器首先创建一个 pool 结构并调用 init_pool 进行初始化。随后服务器进入无限循环,检测读集合中的事件:如果是监听套接字上发生事件,那么就调用 add_client 函数处理。然后调用 check_clients 函数,该函数主要检测是否有客户端消息,有则处理。
void init_pool(int listen_fd, pool *pool)
{
pool->maxi = -1;
for (int i = 0; i < FD_SETSIZE; i++)
pool->clientfd[i] = -1;
pool->maxfd = listen_fd;
FD_ZERO(&pool->read_set);
FD_SET(listen_fd, &pool->read_set);
}
void add_client(int fd, pool *pool)
{
pool->nready--;
for (int i = 0; i < FD_SETSIZE; i++)
if (pool->clientfd[i] < 0) {
/* add client fd to the pool */
pool->clientfd[i] = fd;
FD_SET(fd, &pool->read_set);
if (fd > pool->maxfd)
pool->maxfd = fd;
if (i > pool->maxi)
pool->maxi = i;
break;
}
}
void check_clients(pool *pool)
{
int client_fd, n;
char buf[MAXLINE];
for (int i = 0; (i <= pool->maxi) && (pool->nready > 0); i++) {
client_fd = pool->clientfd[i];
if (client_fd > 0 && FD_ISSET(client_fd, &pool->ready_set)) {
pool->nready--;
if ((n = read(client_fd, buf, MAXLINE - 1)) > 0) {
/* write back to client */
write(client_fd, buf, n);
/* print msg into screen */
buf[n] = 0;
fprintf(stderr, "from client: %s", buf);
} else { /* EOF or Errors */
close(client_fd);
FD_CLR(client_fd, &pool->read_set);
pool->clientfd[i] = -1;
}
}
}
}
clientfd 数组记录了已连接的描述符集合 (-1 代表空),maxi 指明了其中的最大索引。
服务器代码:
#include "assist.h"
#include <stdio.h>
#include <stdlib.h>
#include <sys/socket.h>
int main(int argc, char **argv)
{
int listen_fd, client_fd;
socklen_t client_len;
static pool pool;
struct sockaddr_storage client_addr;
char client_hostname[MAXLINE], client_port[MAXLINE];
if (argc != 2) {
fprintf(stderr, "usage: %s <port>\n", argv[0]);
exit(0);
}
listen_fd = open_listen_fd(argv[1]);
if (listen_fd == -1) {
fprintf(stderr, "cannot create a listen socket\n");
exit(1);
}
init_pool(listen_fd, &pool);
for (;;) {
pool.ready_set = pool.read_set;
pool.nready = select(pool.maxfd + 1, &pool.ready_set, NULL, NULL, NULL);
if (FD_ISSET(listen_fd, &pool.ready_set)) {
client_len = sizeof(client_addr);
client_fd = accept(listen_fd, (SA *)&client_addr, &client_len);
fprintf(stderr, "new connection\n");
add_client(client_fd, &pool);
}
check_clients(&pool);
}
}
比较和优劣
事件驱动设计的一个优点是:它比基于进程的设计给程序员更多的对程序行为的控制。另一个优点是,一个基于 I/O 多路复用的事件驱动服务器是运行在单一进程上下文中的,因此每个逻辑流都能访问该进程的全部地址空间。这使得在流之间共享数据变得很容易。此外,单进程程序还更便于调试。
事件驱动设计的一个明显缺点就是编码复杂,并且随着粒度的减小,复杂性还会上升。这里的粒度指:每个逻辑流每个时间片执行的指令数量。
基于线程
线程就是运行在进程上下文中的逻辑流。现代系统允许一个进程里同时运行多个线程,线程是由内核自动调度的。每个线程都有他自己的线程上下文,包括一个唯一的整数线程 ID,栈,栈指针,程序计数器,通用寄存器和条件码。所有运行在一个进程里的线程共享该进程的整个虚拟地址空间。
基于线程的逻辑流结合了基于进程和基于 I/O 多路复用的流的特性。同进程一样,线程由内核自动调度,并且内核通过线程 ID 来识别线程。同基于 I/O 多路复用技术的流一样,多个线程运行在单一进程的上下文中,因此共享这个进程虚拟地址空间的所有内容,包括代码,数据,共享库,打开文件等。
线程执行模型
多线程的执行模型在某些方面和多进程的相似。每个进程开始生命周期时都是单一线程,这个线程叫做主线程
。在某一时刻,主线程创建一个对等线程
,从这个时间点开始,两个线程就并发的开始运行。
在一些重要的方面,线程的执行是不同于进程的。线程的上下文比进程的上下文小很多,因此线程的上下文切换要比进程快得多。此外,线程不像进程一样按照严格的父子层次来组织。和一个进程相关的线程组成一个对等线程池 (对等池),独立于其他线程创建的线程。主线程和其他线程的区别仅在于它总是进程中第一个运行的线程。
Posix 线程
Posix 线程是在 C 程序中处理线程的一个标准接口。Pthreads 定义了大约 60 个函数,允许程序创建,杀死和回收线程,与对等线程安全的共享数据,还可以通知对等线程系统状态的变化。
创建线程
线程通过调用 pthread_create 函数来创建其他线程。
int pthread_create(pthread_t *tid, pthread_attr_t *attr, func* f, void *arg);
pthread_create 函数创建一个新的线程,并带着一个输入变量 arg,在新线程上运行线程例程 f,能用 attr 参数来改变创建线程的默认属性。
pthread_create 函数返回时,参数 tid 包含新创建线程的 ID。新线程可以通过 pthread_self 函数来获得自己的线程 ID:
pthread_t pthread_self(void);
终止线程
一个线程以下列方式之一来终止:
- 顶层的线程例程返回时,线程会隐式终止。
- 通过调用 pthread_exit 函数,线程会显示的终止。如果主线程调用 pthread_exit,它会等待所有其他对等线程终止,然后再终止主线程和整个进程。返回值为 pthread_return。
void pthread_return(void *thread_return);
-
如果某个对等线程调用 exit 函数,该函数会终止进程以及所有与进程相关的线程。
-
另一个对等线程通过以当前线程 ID 作为参数调用 pthread_cancel 函数来终止当前线程。
int pthread_cancel(pthread_t tid);
回收线程资源
线程通过调用 pthread_join 函数等待其他线程终止。
int pthread_join(pthread_t tid, void **thread_return);
pthread_join 函数会阻塞,直到线程 tid 终止,将线程例程返回的通用指针赋值到 thread_return 指向的位置,然后回收已终止线程占用的所有内存资源。
和 Linux 的 wait 函数不同,pthread_join 函数只能等待一个指定的线程终止。没有办法让 pthread_join 等待任意一个线程终止。
分离线程
在任何一个时间点上,线程是可结合的或者是分离的。一个可结合的线程能够被其他线程回收和杀死,在被其他线程回收之前,它的内存资源 (例如栈) 是不释放的。相反,一个分离的线程是不能被其他线程回收或杀死的。它的内存资源在它终止时,由系统自动释放。
默认情况下,线程被创建成可结合的。为了避免内存泄漏,每个可结合线程都应该要么被其他线程显示回收,要么通过调用 pthread_detach 函数被分离。
int pthread_detach(pthread_t tid);
pthread_detach 函数分离可结合线程 tid,线程可以通过以 pthread_self() 为参数分离自己。
初始化线程
pthread_once 函数允许初始化与线程例程相关的状态。
pthread_once_t once_control = PTHREAD_ONCE_INIT;
int pthread_once(pthread_once_t *once_control, void (*init_routine)(void));
once_control 是一个全局或者静态变量,并且总是被初始化为 PTHREAD_ONCE_INIT。当第一次调用 pthread_once 时,init_routine 会被调用执行。当 pthread_once 被第二次执行时,它会立即返回。这在多线程环境中保证初始化代码只被执行一次很有用。
并发服务器
基于线程的并发服务器代码很简单:
#include "assist.h"
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <sys/socket.h>
int main(int argc, char **argv)
{
int listen_fd, *client_fd;
socklen_t client_len;
struct sockaddr_storage client_addr;
char client_hostname[MAXLINE], client_port[MAXLINE];
if (argc != 2) {
fprintf(stderr, "usage: %s <port>\n", argv[0]);
exit(0);
}
listen_fd = open_listen_fd(argv[1]);
if (listen_fd == -1) {
fprintf(stderr, "cannot create a listen socket\n");
exit(1);
}
pthread_t tid;
for (;;) {
client_len = sizeof(client_addr);
client_fd = malloc(sizeof(int));
*client_fd = accept(listen_fd, (SA *)&client_addr, &client_len);
pthread_create(&tid, NULL, thread, client_fd);
}
}
thread 函数的内容也很简单:
void *thread(void *argp)
{
int client_fd = *((int *)argp);
/* release resource */
free(argp);
pthread_detach(pthread_self());
fprintf(stderr, "new connection\n");
echo(client_fd);
close(client_fd);
return NULL;
}
多线程共享变量
使用一个示例程序来说明多线程程序中共享变量的问题:
#define N (2)
void *thread(void *argp);
char **ptr;
int main()
{
pthread_t tid;
char *msg[N] = {
"hello from foo",
"hello from bar"
};
ptr = msgs;
for (int i = 0; i < N; i++)
pthread_create(&tid, NULL, thread, (void *)i);
pthread_exit(NULL);
}
void *thread(void *argp)
{
int myid = (int)argp;
static int cnt = 0;
printf("[%d]: %s (cnt=%d)\n", myid, ptr[myid], ++cnt);
return NULL:
}
线程内存模型
一组并发线程运行在一个进程的上下文中。每个线程都有自己独立的线程上下文,包括线程 ID,栈,栈指针,程序计数器,条件码以及通用寄存器值。每个线程和其他线程一起共享进程上下文的剩余部分,包括整个用户虚拟地址空间。线程也共享相同的打开文件的集合。
从实际操作角度来说,一个线程读写另一个线程的寄存器值是不可能的。另一方面,因为所有线程共享虚拟内存的任意位置,如果某个线程修改了一个内存位置,那么其他每个线程最终都能在它读这个位置时发现这个变化。因此,寄存器是从不共享的,而虚拟内存总是共享的。
各自独立的线程栈内存模型不是那么整齐清晰,这些线程栈被保存在虚拟地址空间中的栈区域中,通常是被相应的线程独立的访问的。不同的线程栈是不对其他线程设防的,所以,如果一个线程以某种方式得到一个指向其他线程栈的指针,那么它就可以读写这个栈的任何部分。
变量映射内存
多线程的 C 程序中变量根据他们的存储类型被映射到内存:
- 全局变量:如变量 ptr。在运行时,全局变量只有一个实例,任何线程都能访问。
- 本地自动变量:如变量 myid。每个线程栈都包含它自己的所有本地自动变量的实例。
- 本地静态变量:如变量 cnt。虚拟内存中只包含一个本地静态变量的实例,每个线程都能访问。
共享变量
当且仅当变量 v 的一个实例被一个以上的线程引用时,变量 v 是共享的。如 ptr 和 cnt。
信号量同步线程
共享变量很方便,但是它也引入了同步错误的可能性:
void *thread(void *argp);
volatile long cnt = 0;
int main()
{
long niters;
pthread_t tid1, tid2;
niters = atoi(argv[1]);
pthread_create(&tid1, NULL, thread, &niters);
pthread_create(&tid2, NULL, thread, &niters);
pthread_join(&tid1, NULL);
pthread_join(&tid2, NULL);
if (cnt != 2 * niters)
printf("BOOM! cnt = %ld\n", cnt);
else
printf("OK cnt = %ld\n", cnt);
}
void *thread(void *argp)
{
long niters = *((long *)argp);
for (long i = 0; i < niters; i++)
cnt++;
return NULL;
}
经过多次运行发现,每次得到的结果都不同,并且都是错误的。观察 thread 中循环部分的汇编代码:
可以将线程 i 的循环代码分解成五个部分:
- Hi:在循环头部的指令块。
- Li:加载共享变量 cnt 到寄存器 %rdxi (%rdxi 表示线程 i 中的寄存器值)。
- Ui:更新 %rdxi 的指令。
- Si:将 %rdxi 的更新值存回共享变量 cnt 的指令。
- Ti:循环尾部的指令块。
当上面的程序的两个对等线程在一个单处理器上并发运行时,机器指令以某种顺序一个接一个的完成。因此,每个并发执行定义了两个线程中的指令的某种全序 (或者交叉)。这些顺序中的一些将会产生正确的结果,但是其他的则不会。
关键点在于:一般而言,没有办法预测操作系统是否为线程选择一个正确的顺序。
进度图
进度图将 n 个并发线程的执行模型化为一条 n 维笛卡尔空间中的轨迹线。每条轴 k 对应线程 k 的进度。每个点 (I1, I2, …, In) 代表线程已经完成了指令 Ik 这一状态。原点代表没有任何线程完成一条指令的初始状态。
上图中水平轴对应线程 1,垂直轴对应线程 2。一个程序的执行历史被模型化为状态空间中的一条轨迹线:
对于线程 i,操作共享变量 cnt 内容的指令 (Li, Ui, Si) 构成了一个关于 cnt 的临界区
,这个临界区不应该和其他线程的临界区交替执行。换句话说,必须确保每个线程在执行它的临界区中的指令时,拥有对共享变量的互斥访问
。通常这种现象称为互斥
。
在进度图中,两个临界区的交集形成的状态空间区域称为不安全区
。注意:不安全区和与它交界的状态紧连,但是不包括这些状态。绕开不安全区的轨迹线叫做安全轨迹线
,相反,接触到任何不安全区的轨迹线叫做不安全轨迹线
。
任何安全轨迹线都将正确的更新共享计数器。
信号量
信号量是一种经典的解决同步不同执行线程问题的方法。信号量 s 是具有非负整数值的全局变量,只能有两种特殊的操作来处理,这两种操作称为 P 和 V:
- P(s):如果 s 非 0,那么 P 将 s 减 1,并立即返回。如果 s 为 0,那么就挂起这个线程,直到 s 变为 0,而一个 V 操作将会重启这个线程。在重启之后,P 操作将 s 减 1,并将控制返回调用者。
- V(s):V 操作将 s 加 1。如果有任何线程阻塞在 P 操作等待 s 变成非 0,就会重启这些线程中的一个,然后该线程将 s 减 1,完成它的 P 操作。
P 中的测试和减 1 操作是不可分割的,也就是说:一旦检测到 s 变为非 0,就会将 s 减 1,不能有中断。V 中的加 1 操作也是不可分割的,也就是:加载,加 1,存储的过程没有中断。注意:V 的定义中没有定义等待线程被重启的顺序。因此,当有多个线程在等待同一个信号量时,不能预测 V 操作要重启哪个线程。
P 和 V 的定义确保一个正确初始化的信号量不会有负值,这个属性称为信号不变性
。Posix 标准定义了许多操作信号量的函数:
int sem_init(sem_t *sem, 0, unsigned int value);
int sem_wait(sem_t *s); /* P(s) */
int sem_post(sem_t *s); /* V(s) */
sem_init 函数将信号量 sem 初始化为 value。每个信号量在使用前必须初始化。
实现互斥
信号量提供了一种很方便的方法来确保对共享变量的互斥访问。基本思想是将每个共享变量 (或一组相关的共享变量) 与一个信号量 s (初始值为 1) 关联起来。然后用 P(s) 和 V(s) 操作将相应的临界区包围起来。
因为信号量的值总是 0 或者 1,所以这种信号量也叫做二元信号量
,通常也称为互斥锁
。在一个互斥锁上执行 P 操作称为加锁,执行 V 操作称为解锁。一个被用作一组可用资源的计数器的信号量称为计数信号量
。
上面标注了每个状态下信号量的值,P 和 V 的结合操作创建了一组禁止区 (s < 0) 禁止区包含不安全区,因为信号量的不可变性,没有实际可能的轨迹线能够接触到不安全区的任何部分。
利用互斥锁可以很轻松的解决之前的问题:
volatile long cnt = 0;
sem_t mutex;
/* in main */
int main()
{
sem_init(&mutex, 0, 1);
/* ... */
}
/* in thread */
void *thread(void *argp)
{
/* ... */
for (long i = 0; i < niters; i++) {
P(&mutex);
cnt++;
V(&mutex);
}
}
TODO 调度资源
待更新。
多线程服务器
所谓的预线程化其实就是线程池技术,线程池的主要思想就是在服务器启动时就创建一组工作线程,他们永不停息的工作。并且利用之前的生产者 - 消费者模型,主线程负责处理客户端连接,并将已连接描述符发送给线程池中的工作线程。
这么做的好处是:避免了每次有一个新客户端就要创建一个线程的开销,并且避免了短连接导致反复创建销毁线程的浪费,提高了服务器的吞吐量。
使用 SBUF 包来实现一个线程池化的并发 echo 服务器。代码很简单,这里加入了一点新变动:echo_cnt 中包含了对所有客户端发送来的字节数的统计,虽然实际上它没什么用处,但是它使用了 pthread_once 进行初始化。
#include "assist.h"
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>
#include <sys/socket.h>
sbuf_t sbuf;
int main(int argc, char **argv)
{
pthread_t tid;
socklen_t client_len;
int listen_fd, client_fd;
struct sockaddr_storage client_addr;
char client_hostname[MAXLINE], client_port[MAXLINE];
if (argc != 2) {
fprintf(stderr, "usage: %s <port>\n", argv[0]);
exit(0);
}
listen_fd = open_listen_fd(argv[1]);
if (listen_fd == -1) {
fprintf(stderr, "cannot create a listen socket\n");
exit(1);
}
sbuf_init(&sbuf, BUFSIZE);
for (int i = 0; i < THREADS; i++)
pthread_create(&tid, NULL, thread2, &sbuf);
for (;;) {
client_len = sizeof(client_addr);
client_fd = accept(listen_fd, (SA *)&client_addr, &client_len);
sbuf_insert(&sbuf, client_fd);
fprintf(stderr, "new connection\n");
}
close(listen_fd);
}
echo_cnt 函数和 thread2 函数的内容:
static int byte_cnt;
static sem_t mutex;
static void init_echo_cnt(void)
{
sem_init(&mutex, 0, 1);
byte_cnt = 0;
}
void echo_cnt(int client_fd)
{
int nread;
char buf[MAXLINE];
static pthread_once_t once = PTHREAD_ONCE_INIT;
pthread_once(&once, init_echo_cnt);
while ((nread = read(client_fd, buf, MAXLINE - 1)) > 0) {
buf[nread] = 0;
sem_wait(&mutex);
byte_cnt += nread;
fprintf(stderr, "from client: %s (%d / %d)", buf, nread, byte_cnt);
sem_post(&mutex);
/* write back to client */
write(client_fd, buf, nread);
}
}
void *thread2(void *sbuf)
{
pthread_detach(pthread_self());
for (;;) {
int fd = sbuf_remove((sbuf_t *)sbuf);
echo_cnt(fd);
close(fd);
}
}
TODO 线程提高并行
之前所有的例子都假设并发线程是在单处理器系统上执行的,然而,大多数现代机器具有多核处理器。并发程序通常在这样的机器上运行的更快,因为操作系统内核会在多个核上并行的调度这些并发线程。
将任务分配到不同的线程最直接的方法是将序列划分成 t 个不相交的区域,然后给 t 个不同的线程每个分配一个区域。一个简单的例子是求和一列整数 0, 1 …, n - 1。最后赋值给全局变量 gsum。
待更新。
并行程序性能
之前 psum-local 的运行时间如图,它是一个关于线程数量的函数:随着线程数的增加,运行时间下降,知道增加到四个线程 (运行在四核处理器上),此时,运行时间趋于平稳,甚至开始有点下降。
通常,使用加速比
来衡量并行程序的性能:Sp = T1 / Tp (这个公式也被称为强扩展
)。其中 p 是处理器核心数量,Tp 是程序在 p 个核心上的运行时间,T1 是顺序程序的运行时间。Sp 称为绝对加速比
。当 T1 是并行版本程序在一个核心上的运行时间时,Sp 称为相对加速比
。
一种相关的测量量称为效率
:Ep = Sp / p。效率范围通常在 (0, 100],效率用于测量由并行化造成的开销。具有高效率的程序比低效率的程序花费更多时间在有用的工作上。换句话说,低效率的程序在同步和通信上开销更大。
其他并发问题
一旦要求同步对共享数据的访问,那么事情就变得复杂起来。同步从根本上说是很难的问题,它引出了在普通的顺序程序中完全不会出现的问题。
线程安全
一个函数被称为线程安全
的,当且仅当被多个并发线程反复的调用时,它会一直产生正确的结果。如果一个函数不是线程安全的,就说它是线程不安全
的。
可以定义四类不相交的线程不安全函数类:
- 不保护共享变量的函数
在信号量同步线程开始时写到的 thread 函数就是这类,该函数对一个未受保护的全局计数器变量加 1。将这类函数变成线程安全的,相对而言比较简单:利用像 P V 操作这样的同步操作来保护共享变量。这个方法的优点是在调用程序中不需要做任何修改,缺点是同步操作将减慢程序的执行时间。
- 保持跨越多个调用的状态的函数
一个伪随机数生成器是这类线程不安全函数的例子:
unsigned next_seed = 1;
unsigned rand(void)
{
next_seed = next_seed * 1103515245 + 12543;
return (unsigned)(next_seed >> 16) % 32768;
}
void srand(unsigned new_seed)
{
next_seed = new_seed;
}
rand 函数是线程不安全的,因为当前调用的结果依赖于前两次调用的中间结果。简单来说:当调用 srand 设置了一个种子后,在一个单线程中反复调用 rand,就能得到一个可重复的随机数字序列。然而,如果多线程调用 rand 函数,这种假设就不成立了。
使得像 rand 这样的函数线程安全的唯一方式是重写它,使得它不再使用任何 static 数据,而是依靠调用者在参数中传递状态信息。这样做的缺点是,程序员现在被迫要修改调用程序中的代码。
- 返回指向静态变量指针的函数
例如 ctime 和 gethostbyname 函数,他们将计算结果放在一个 static 变量中,然后返回一个指向这个变量的指针。如果从并发线程中调用这些函数,那么可能会发生数据覆盖。
解决办法有两种:一种是选择重写这类函数,让调用者传递存放结果的地址。这就消除了共享数据,但是这种办法要求程序员能够修改函数的源代码。
另一种方法是使用加锁 - 复制技术。基本思想是将线程不安全的函数与互斥锁联系起来:在每一个调用位置,对互斥锁加锁,调用线程不安全的函数,将函数返回的结果复制到一个私有的内存位置,然后对互斥锁解锁。为了减少对调用者的修改,应该定义一个线程安全的包装函数,它执行加锁 - 复制,然后通过调用这个包装函数来取代所有对线程不安全函数的调用,一个 ctime 的线程安全包装如下:
char *ctime_ts(const time_t *timep, char *privatep)
{
char *sharedp;
P(&mutex);
sharedp = ctime(timep);
strcpy(privatep, sharedp);
V(&mutex);
return privatep;
}
- 调用线程不安全函数的函数
如果函数 f 调用线程不安全函数 g,有两种情况:如果 g 是第 2 类线程不安全函数,那么除了重写 g 以外没什么方法,所以 f 也是线程不安全的函数。如果 g 是第 1 或 3 类线程不安全函数,那么只要用一个互斥锁保护调用位置和得到的任何共享数据,f 仍可能是一个线程安全的函数。
可重入性
可重入函数是一类重要的线程安全的函数,它不依赖于外部状态,因此可以在同一个线程的多个调用栈中安全使用。一个重要特点是:可重入函数不引用任何共享数据。
通常来说,可重入函数要比不可重入的线程安全函数更高效,因为他们不引用外部共享数据,所以不需要做同步操作。一个可重入版本的 rand 函数:
int rand_r(unsigned int *nextp)
{
*nextp = *nextp + 1103515245 + 12345;
return (unsigned int)(*nextp / 65535) % 32768;
}
可重入函数有两类,都统称为可重入函数:显示可重入的和隐式可重入的。显示可重入函数要求函数参数都是传值传递的 (没有指针),并且所有数据引用都是本地的自动栈变量 (不引用静态或者全局变量)。隐式可重入函数在显示的基础上放松了一点要求:允许函数参数中一些参数是引用传递的 (允许指针),如果调用线程小心的传递指向非共享数据的指针,那么它是可重入的。
使用库函数
大多数 Linux 函数,包括定义在标准 C 中的函数 (malloc, free, realloc, printf, scanf…) 都是线程安全的,只有一小部分是例外 (只列一部分):
除了 rand 和 strtok 以外,其他的都是第 3 类线程不安全函数,他们返回一个静态变量的指针。可以使用加锁 - 复制技术写一个包装函数,但是这会因为同步拖慢程序的速度。但是加锁 - 复制技术对 rand 这类第 2 类线程不安全函数并无效果。
因此 Linux 提供大多数线程不安全函数的可重入版本,可重入版本总是以 _r 后缀结尾。
竞争
当一个程序的正确性依赖于一个线程要在另一个线程到达 y 点之前到达它的控制流中的 x 点时,就会发生竞争。一个简单的例子:
#define N (4)
void *thread(void *vargp);
int main()
{
pthread_t tid[N];
for (int i = 0; i < N; i++)
pthread_create(&tid[i], NULL, thread, &i);
for (int i = 0; i < N; i++)
pthread_join(tid[i], NULL);
}
void *thread(void *vargp)
{
int myid = *((int *)vargp);
printf("Hello from thread %d\n", myid);
return NULL;
}
这个程序正确的前提是:对等线程执行第 15 行的时机必须在主线程创建线程后执行第 8 行更新 i 之前。问题在于,这个时机是由内核调度决定的,所以这个程序发生了竞争。一个解决办法是使用动态分配为每个线程分配一个独立的 id,但是这些块必须及时回收:
int main()
{
/* ... */
for (int i = 0; i < N; i++) {
int *ptr = malloc(sizeof(int));
*ptr = i;
pthread_create(&tid[N], NULL, thread, ptr);
}
/* ... */
}
void *thread(void *vargp)
{
int myid = *((int *)vargp);
free(vargp);
/* ... */
}
死锁
死锁是信号量引入的一种潜在的运行时错误。它指一组线程被阻塞了,等待一个永远也不会为真的条件。
- 由于 P V 操作顺序不当,两个信号量的禁止区域重叠。如果某个轨迹线到达了死锁状态 d,那么就不可能有进一步的进展了。因为重叠的禁止区域阻塞了各个合法方向上的进展。
- 重叠的禁止区域引起了一组称为死锁区域的状态,如果一个轨迹线到达了死锁区域,那么它将不可能离开。
- 死锁困难之处在于它是不可预测的,也许前 1000 次程序都运行正常,但是下一次就发生死锁。
当使用二元信号量来实现互斥时,可以使用一种简单有效的规则避免死锁:
互斥加锁顺序规则:给定所有互斥操作的一个全序,如果每个线程都是以一种顺序获得互斥锁并且以相反的顺序释放,那么这个程序就是无死锁的。
19:30 Nov 15 2023.