1 理解线程
1.1 引入线程背景
多进程模型有如下缺陷:
- 创建进程的过程会带来一定的开销。
- 为了完成进程间的数据交换,需要特殊的IPC技术
- 每秒少则数十次,多则数千次的‘上下文切换’是创建进程时最大的开销
运行程序前需要将相应进程信息读入内存,如果运行进程A后需要紧接着运行进程B,就应该将进程A相关信息移出内存,并读入进程B相关信息。这就是上下文切换。但此时进程A的数据也将被移动到硬盘,所以上下文切换需要很长时间。即使通过优化加快速度,也会存在一定的局限。
为了保持多进程的优点,同时在一定程度上克服其缺点,引入了线程,线程具有以下优点。
- 线程的创建和上下文切换比进程的创建和上下文切换更快
- 线程间交换数据无需特殊技术
1.2 线程和进程的差异
每个进程的内存空间都由保存全局变量的“数据区”,向malloc等函数的动态分配提供空间的堆(Heap)、函数运行时使用的(Stack)构成,每个进程都拥有这种独立空间。
但是如果只是想获得多个代码执行流,则不需要将内存完全分离(对系统负担很大),只需要分离栈区域。这样做有以下优势。
- 上下文切换时不需要切换数据区和堆
- 可以利用数据区和堆交换数据
多个线程将共享数据区和堆。为了保持这种结构,线程将在进程内创建并运行,也就是说可以将进程和线程定义为如下形式:
- 进程:在操作系统构成单独执行流的单位
- 线程:在进程构成单独执行流的单位
操作系统、进程、线程的关系如下:
2 线程创建和运行
POSIX是Portable Operating System lnterface for Computer Environment (适用于计算机环境的
可移植操作系统接口)的简写,是为了提高UNIX系列操作系统间的移植性而制定的API规范。下
面要介绍的线程创建方法也是以POSIX标准为依据的。因此,它不仅适用于Linux ,也适用于大
部分UNIX系列的操作系统。
2.1 线程的创建和执行流程
#include <pthread.h>int pthread_create(pthread_t* restrict thread, //保存新创建线程ID的变量地址值const pthread_attr_t* restrict attr, //传递线程属性的参数,传递NULL时,创建默认属性线程void* (*start_routine)(void*), //在单独执行流中执行的函数地址值(函数指针)void* restrict arg);
注意编译时添加-lpthread |
#include <stdio.h>
#include <pthread.h>void* thread_main(void* arg) {int i = 0;int cnt = *((int*)arg);for ( ; i < cnt; i++) {sleep(1);puts("running thread");}return NULL;
}int main(int argc, char* argv[]) {pthread_t t_id;int thread_param = 5;if (pthread_create(&t_id, NULL, thread_main, (void*)&thread_param) != 0) {puts("thread_create error");return -1;}sleep(10);//保证子线程先于主线程结束puts("end of main");return 0;
}
如果将sleep(10)
改成sleep(2)
,则不会输出5次running thread
,因为main函数返回后整个进程将被销毁。但是我们通过sleep预测程序的执行流程的行为,有可能干扰程序的正常执行流。
为了更好地控制线程的执行流,需要了解下面的函数:
#include <pthread.h>
//status 保存线程的main函数返回值的指针变量地址值
int pthread_join(pthread_t thread, void** status);
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>void* thread_main(void* arg) {int cnt = *((int*)arg);char* msg = (char*)malloc(sizeof(char)*50);strcpy(msg, "hello, i am thread~ \n");for (int i = 0; i < cnt; i++) {sleep(1);puts("running thread");}return (void*)msg;
}int main(int argc, char* argv[]) {pthread_t t_id;int thread_param = 5;void* thr_ret;if (pthread_create(&t_id, NULL, thread_main, (void*)&thread_param) != 0) {puts("thread_create error");return -1;}if(pthread_join(t_id, &thr_ret) != 0) {puts("pthread_join error");return 0;}printf("thread return message: %s \n", (char*)thr_ret);free(thr_ret);return 0;
}
2.2 可在临界区内调用的函数
多个函数同时执行临界区代码时可能会产生问题。根据临界区是否引起问题,函数分为2类。
- 线程安全函数(被多个线程调用也不会引发问题)
- 非线程安全函数
_REENTRANT宏的作用
大多数标准函数都是线程安全的函数。更幸运的是,平台在定义非线程安全函数的同时,也提供了线程安全的函数。比如,域名转IP地址的函数就不是线程安全的
//非线程安全
struct hostent* gethostbyname(const char* hostname);//线程安全
struct hostent* gethostbyname_r(const char* name,struct hostent* result,char* buffer,int buflen,int* h_errnop);
线程安全版本有点麻烦,幸好可以通过声明头文件前定义_REENTRANT
宏将gethostbyname
函数调用改为gethostbyname_r
。或者可以在编译时添加-D REENTRANT选项定义宏
root@my_linux:/tcpip# gcc -D_REENTRANT mythread.c -0 mthread -lpthread
(感觉很少人用,没找到很多帖子,而且相比线程非安全版本更容易遇到问题,不展开了)。
3 多线程存在的问题和临界区
3.1 多个线程访问同一变量的问题
当多个线程同时访问内存空间,都有可能发生问题
不是说线程会分时使用CPU吗? 那应该不会出现同时访问变量的情况啊。
此处的“同时访问”与同一时刻访问有些区别:
假设2个线程要执行变量值逐次加1的工作,如下图:
图中描述的是2个线程准备将变量num值加1的情况。理想状态下,线程1将num加1变成100后,线程2再访问num时,变量num应该变成101。
但是,整个过程不是num+=1就完事了,值的运算需要CPU。首先线程1读取num的值并将其传递给CPU,CPU加1得到100,最后把100写回num变量中,这样就完成了一次理想过程。但是如果再次过程中线程2通过切换得到CPU,那么就需要从99重来。这样两个线程都操作一次后num的是100,而不是101。
因此线程1访问num时应该阻止其他线程访问。这就是同步。
3.2 临界区位置
我们可以对临界区定义为这种形式:
函数内同时运行多个线程时引起问题的多条语句构成的代码块。
注意:全局变量num不是临界区,因为问题不是它引起的,而是对它进行操作的那些语句,通常位于函数内部。
4 线程同步
4.1 同步的两面性
线程同步用于解决线程访问顺序引发的问题。需要同步的情况可以从如下两个方面考虑。
- 同时访问同一内存空间时发生的情况。
- 需要指定访问同一内存控件的线程执行顺序的情况。
重点说明一下第二种情况。假设A B了两个线程,线程A负责向指定内存空间写入数据,线程B负责取走该数据。这种情况下,如果B先于A取走数据,将导致错误结果。接下来介绍互斥量和信号量同步技术。
4.2 互斥量
互斥量(Mutual Exclusion)主要用于解决线程同步访问的问题,表示不允许多个线程同时访问。
现实中,洗手间就是一个很好的例子,人们无法同时访问洗手间,需要排队,而且为了满足无法同时访问,进去的人还需要加锁,互斥量就是一把优秀的锁。
#include <pthread.h>//成功返回0,失败返回其他值
//mutex 互斥量地址 attr 互斥量属性,无特殊需要传NULL
int pthread_mutex_init(pthread_mutex_t* mutex, const pthread_mutexattr_t* attr);
int pthread_mutex_destroy(pthread_mutex_t* mutex);
为了创建相当于锁系统的互斥量,需要声明如下pthread_mutex_t
型变量:
pthread_mutex_t mutex;
注:不要使用PTHREAD-MUTEX-INITALIZER宏进行初始化,发生错误很难发现。(自行了解)
接下来介绍利用互斥量锁住或释放临界区时使用的函数。
#include <pthread.h>int pthread_mutex_lock(pthread_mutex_t* mutex);
//临界区开始
....
//临界区结束
int pthread_mutex_unlock(pthread_mutex_t* mutex);
注意:线程退出临界区时,如果忘了调用pthread_mutex_unlock
那么为了进入临界区调用pthread_mutex_lock
的线程就无法拜托阻塞状态,这种情况称为死锁。
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>
#include <pthread.h>
#define NUM_THREAD 100
#define _REENTRANTlong long num = 0;
pthread_mutex_t mutex;void* thread_inc(void* arg) {pthread_mutex_lock(&mutex);for (int i = 0; i < 50000; i++) {num += 1;}pthread_mutex_unlock(&mutex);return NULL;
}void* thread_des(void* arg) {pthread_mutex_lock(&mutex);for (int i = 0; i < 50000; i++) {num -= 1;}pthread_mutex_unlock(&mutex);return NULL;
}int main(int argc, char* argv[]) {pthread_t thread_id[NUM_THREAD];pthread_mutex_init(&mutex, NULL);for (int i = 0; i < NUM_THREAD; i++) {if (i % 2) {pthread_create(&(thread_id[i]), NULL, thread_inc, NULL);} else {pthread_create(&(thread_id[i]), NULL, thread_des, NULL);}}for (int i = 0; i < NUM_THREAD; i++) {pthread_join(thread_id[i], NULL);}printf(" result : %lld\n", num);pthread_mutex_destroy(&mutex);return 0;
}
注意:尽可能减少lock和unlock的调用次数,因为调用过程很花时间。
4.3 信号量
此处只涉及利用"二进制信号量"(只用0和1)完成"控制线程顺序"为中心的同步方法。下面是信号量创建和销毁方法。
#include <semaphore.h>//成功时返回0,失败时返回其他值
//sem 信号量的变量地址值
//pshared 传递其他值时,创建可以由多个进程共享的信号量;传递0时,创建只允许1个进程内部使用的信号量。
//value 指定新创建的信号量初始值
int sem_init(sem_t* sem, int pshared, unsigned int value);
int sem_destroy(sem_t* sem);
接下来介绍信号量中相当于互斥量lock,unlock的函数。
#include <semaphore.h>//传递保存信号量读取值的变量地址值,传递给sem_post时信号量增l ,传递给sem_wait时信号量减1
int sem_post(sem_t* sem);
int sem_wait(sem_t* sem);
调用sem init函数时,操作系统将创建信号量对象, 此对象中记录着"信号量值" (Semaphore Value) 整数。该值在调用sem_post函数时增1 ,调用sem-mfait函数时减1 。但信号量的值不能小于0 ,因此, 在信号量为0的情况下调用sem-wait函数时, 调用函数的线程将进入阻塞状态(因为函数未返回)。当然, 此时如果有其他线程调用sem_post函数, 信号量的值将变为1 ,而原本阻塞的线程可以将该信号量重新减为0并跳出阻塞状态。实际上就是通过这种特性完成临界区的同步操作,可以通过如下形式同步临界区(假设信号量的初始值为1 )。
sem_wait(&sem); //信号量变为0
//临界区开始
//....
//临界区的结束
sem_post(&sem); //信号量变为1
上述代码结构中,调用sem-wait函数进入临界区的线程在调用sem_post函数前不允许其他线程进入临界区。信号量的值在0和1 之间跳转,因此,具有这种特性的机制称为"二进制信号量"。
我们使用下面例子介绍关于控制访问顺序的同步:
“线程A从用户输入得到值后存入全局变量num ,此时线程B将取走该值并累加。该过程共进行5 次, 完成后输出总和并退出程序。”
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
#define _REENTRANTvoid* read(void* arg);
void* accu(void* arg);
static sem_t sem_one;
static sem_t sem_two;
static int num;int main(int argc, char* argv[]) {pthread_t t1, t2;sem_init(&sem_one, 0, 0);sem_init(&sem_two, 0, 1);pthread_create(&t1, NULL, read, NULL);pthread_create(&t2, NULL, accu, NULL);pthread_join(t1, NULL);pthread_join(t2, NULL);sem_destroy(&sem_one);sem_destroy(&sem_two);return 0;
}void* read(void* arg) {for (int i = 0; i < 5; i++) {fputs("Input num: ", stdout);sem_wait(&sem_two);scanf("%d", &num);sem_post(&sem_one);}return NULL;
}void* accu(void* arg) {int sum = 0;for (int i = 0; i < 5; i++) {sem_wait(&sem_one);sum += num;sem_post(&sem_two);}printf("Result: %d\n", sum);return NULL;
}
5 线程的销毁和多线程并发服务端的实现
5.1 销毁线程的3种方法
Linux线程并不是在首次调用的线程main函数返回时自动销毁,所以必须用下面两个方法之一加以明确,否则由线程创建的空间将一直存在。
- 调用pthread_join函数
- 调用pthread_detach函数
pthread_join
函数调用时,不仅会等待线程终止,还会引导线程销毁。但该函数的问题是,线程终止前,调用该函数的线程会进入阻塞状态。因此,通常调用如下函数引导线程销毁。
#include <pthread.h>
//分离线程,相当于“两者没关系了”,两者真正的“并行”执行,子线程执行完自行回收
int pthread_detach(pthread_t thread);
调用上述函数不会引起线程中止或者进入阻塞状态。注意:两者不能一起使用。(不过我遇到的C++业务中一般会使用pthread_join(C++是join),因为需要使用多线程的操作往往成对出现(开始和结束),所以线程回收写进结束函数(或者析构函数),释放时机由自己把控)。
5.2 多线程并发服务端的实现
chat_server.c
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <pthread.h>#define BUF_SIZE 100
#define MAX_CLNT 256void * handle_clnt(void * arg);
void send_msg(char * msg, int len);
void error_handling(char * msg);int clnt_cnt=0;
int clnt_socks[MAX_CLNT];
pthread_mutex_t mutx;int main(int argc, char *argv[])
{int serv_sock, clnt_sock;struct sockaddr_in serv_adr, clnt_adr;int clnt_adr_sz;pthread_t t_id;if(argc!=2) {printf("Usage : %s <port>\n", argv[0]);exit(1);}pthread_mutex_init(&mutx, NULL);serv_sock=socket(PF_INET, SOCK_STREAM, 0);memset(&serv_adr, 0, sizeof(serv_adr));serv_adr.sin_family=AF_INET; serv_adr.sin_addr.s_addr=htonl(INADDR_ANY);serv_adr.sin_port=htons(atoi(argv[1]));if(bind(serv_sock, (struct sockaddr*) &serv_adr, sizeof(serv_adr))==-1)error_handling("bind() error");if(listen(serv_sock, 5)==-1)error_handling("listen() error");while(1){clnt_adr_sz=sizeof(clnt_adr);clnt_sock=accept(serv_sock, (struct sockaddr*)&clnt_adr,&clnt_adr_sz);pthread_mutex_lock(&mutx);//添加套介子到数组clnt_socks[clnt_cnt++]=clnt_sock;pthread_mutex_unlock(&mutx);pthread_create(&t_id, NULL, handle_clnt, (void*)&clnt_sock);pthread_detach(t_id);printf("Connected client IP: %s \n", inet_ntoa(clnt_adr.sin_addr));}close(serv_sock);return 0;
}void * handle_clnt(void * arg)
{int clnt_sock=*((int*)arg);int str_len=0, i;char msg[BUF_SIZE];while((str_len=read(clnt_sock, msg, sizeof(msg)))!=0)send_msg(msg, str_len);pthread_mutex_lock(&mutx);for(i=0; i<clnt_cnt; i++) // remove disconnected client{if(clnt_sock==clnt_socks[i]) //删除当前的连接,后边的统一往前移动{while(i++<clnt_cnt-1)clnt_socks[i]=clnt_socks[i+1]; break;}}clnt_cnt--;pthread_mutex_unlock(&mutx);close(clnt_sock);return NULL;
}
void send_msg(char * msg, int len) // send to all
{int i;pthread_mutex_lock(&mutx);for(i=0; i<clnt_cnt; i++)write(clnt_socks[i], msg, len);pthread_mutex_unlock(&mutx);
}
void error_handling(char * msg)
{fputs(msg, stderr);fputc('\n', stderr);exit(1);
}
chat_client.c
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <pthread.h>#define BUF_SIZE 100
#define NAME_SIZE 20void * send_msg(void * arg);
void * recv_msg(void * arg);
void error_handling(char * msg);char name[NAME_SIZE]="[DEFAULT]";
char msg[BUF_SIZE];int main(int argc, char *argv[])
{int sock;struct sockaddr_in serv_addr;pthread_t snd_thread, rcv_thread;void * thread_return;if(argc!=4) {printf("Usage : %s <IP> <port> <name>\n", argv[0]);exit(1);}sprintf(name, "[%s]", argv[3]);sock=socket(PF_INET, SOCK_STREAM, 0);memset(&serv_addr, 0, sizeof(serv_addr));serv_addr.sin_family=AF_INET;serv_addr.sin_addr.s_addr=inet_addr(argv[1]);serv_addr.sin_port=htons(atoi(argv[2]));if(connect(sock, (struct sockaddr*)&serv_addr, sizeof(serv_addr))==-1)error_handling("connect() error");pthread_create(&snd_thread, NULL, send_msg, (void*)&sock);pthread_create(&rcv_thread, NULL, recv_msg, (void*)&sock);pthread_join(snd_thread, &thread_return);pthread_join(rcv_thread, &thread_return);close(sock); return 0;
}void * send_msg(void * arg) // send thread main
{int sock=*((int*)arg);char name_msg[NAME_SIZE+BUF_SIZE];while(1) {fgets(msg, BUF_SIZE, stdin);if(!strcmp(msg,"q\n")||!strcmp(msg,"Q\n")) {close(sock);exit(0);}sprintf(name_msg,"%s %s", name, msg);write(sock, name_msg, strlen(name_msg));}return NULL;
}void * recv_msg(void * arg) // read thread main
{int sock=*((int*)arg);char name_msg[NAME_SIZE+BUF_SIZE];int str_len;while(1){str_len=read(sock, name_msg, NAME_SIZE+BUF_SIZE-1);if(str_len==-1) return (void*)-1;name_msg[str_len]=0;fputs(name_msg, stdout);}return NULL;
}void error_handling(char *msg)
{fputs(msg, stderr);fputc('\n', stderr);exit(1);
}