一、概述
线程是一种轻量级的并发执行的机制。线程是进程中的一个实体,它执行在同一进程的上下文中,共享同一内存空间,但拥有独立的栈空间。
C语言的线程使用pthread库实现,通过包含头文件 pthread.h
来使用相关的函数和数据类型
二、线程处理
1、相关函数
以下是一些常用的线程处理函数及其用法:
pthread_create:用于创建一个新的线程。原型:
int pthread_create(pthread_t *thread, const pthread_attr_t *attr, void *(*start_routine) (void *), void *arg);
参数:
thread:指向线程标识符的指针,用于保存新创建线程的ID。
attr:用于指定线程属性的对象。通常使用默认属性,可以传递NULL。
start_routine:指向线程函数的指针。
arg:传递给线程函数的参数。
返回值:成功时返回0,失败时返回错误码。
pthread_exit:用于终止当前线程并返回一个值。原型:
void pthread_exit(void *value_ptr);
参数:
value_ptr:指向线程返回值的指针。
返回值:无。
pthread_join:用于等待指定的线程终止。原型:
int pthread_join(pthread_t thread, void **value_ptr);
参数:
thread:要等待的线程的ID。
value_ptr:指向线程返回值的指针。
返回值:成功时返回0,失败时返回错误码。
pthread_detach:将线程标记为分离状态,使得线程在退出时自动释放资源,无需其他线程调用pthread_join进行等待。原型:
int pthread_detach(pthread_t thread);
参数:
thread:要分离的线程的ID。
返回值:成功时返回0,失败时返回错误码。
pthread_cancel:取消指定的线程,默认取消类型为延时,直到该线程调用 pthread_testcancel 函数为止。类型修改使用 pthread_setcanceltype 函数。原型:
int pthread_cancel(pthread_t thread);
参数:
thread:要取消的线程的ID。
返回值:成功时返回0,失败时返回错误码。
2、用例
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>void *task(void * argv){printf("thread strat\n");//默认取消类型为延时,到调用 pthread_testcancel 函数为止printf("working\n");sleep(1);pthread_testcancel();printf("arter cancel\n");return NULL;}
int main(int argc, char const *argv[])
{pthread_t pid;pthread_create(&pid,NULL,task,NULL);//取消子线程if(pthread_cancel(pid) != 0){perror("pthread_cancel");}void *res;//等待子线程终止并获取退出状态pthread_join(pid,&res);if (res == PTHREAD_CANCELED){printf("cancel succeed\n");}else{printf("cancel defent\n");}return 0;
}
三、线程同步
只要使用线程就一定会存在竞态。那么什么是竞态呢?
竞态是指当多个线程同时访问和操作共享的数据时,最终的结果依赖于线程执行的具体顺序,而这个顺序是无法确定的,可能会发生错误。比如创建10000个线程对一个全局变量进行累加。由于多个线程并发执行,就可能导致多个线程读到同一个值然后执行 +1 后写回变量中。从而导致最终的累加结果小于10000。
竞态的解决办法就是锁。通过锁住一个线程,让该线程执行的时候不被其他线程打扰,从而解决竞态。
1、互斥锁(Mutex)
①:相关函数
互斥锁是一种同步原语,用于保护共享资源,确保在同一时间只有一个线程可以访问该资源。
相关函数主要包括以下几个:
pthread_mutex_init
函数用于显示初始化互斥锁变量。它的原型如下:
int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr);
其中,mutex
是一个指向互斥锁变量的指针,attr
参数可以用来设置互斥锁的属性,可以传入NULL
使用默认属性。函数成功返回0,失败返回一个错误码。
下面代码用于静态初始化互斥锁,静态初始化可以不调用pthread_mutex_destroy
函数销毁互斥锁。
如果是动态分配或者被跨多个函数或文件使用则需要显式销毁。
static pthread_mutex_t counter_mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_destroy
函数用于销毁互斥锁变量,释放相关资源。它的原型如下:
int pthread_mutex_destroy(pthread_mutex_t *mutex);
其中,mutex
是一个指向互斥锁变量的指针。函数成功返回0,失败返回一个错误码。
pthread_mutex_lock
函数用于加锁,即获取互斥锁。如果互斥锁已经被其他线程锁定,当前线程将阻塞,直到互斥锁被释放。它的原型如下:
int pthread_mutex_lock(pthread_mutex_t *mutex);
其中,mutex
是一个指向互斥锁变量的指针。函数成功返回0,失败返回一个错误码。
pthread_mutex_unlock
函数用于解锁,即释放互斥锁。它的原型如下:
int pthread_mutex_unlock(pthread_mutex_t *mutex);
其中,mutex
是一个指向互斥锁变量的指针。函数成功返回0,失败返回一个错误码。
互斥锁只能在同一进程的不同线程之间同步,不能用于进程间的同步。在多线程程序中使用互斥锁时,需要保证所有线程使用的是同一个互斥锁变量。
使用互斥锁的基本流程是先初始化互斥锁,然后在需要保护共享资源的代码段前后加锁和解锁操作。
②:用例
该代码实现的是在两万个线程并发执行对同一变量进行累加时使用互斥锁进行保护。
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>#define THREAD_COUNT 20000//静态初始化锁 如果是动态分配或者被跨多个函数或文件使用则需要显式销毁
static pthread_mutex_t counter_mutex = PTHREAD_MUTEX_INITIALIZER;//创建多个线程
void * add_thread(void *argv){int *p = (int *)argv;//累加之前获取锁,保证同一时间只有一个线程使用pthread_mutex_lock(&counter_mutex);//pthread_mutex_trylock(不会阻塞,而是返回EBUSY)(*p)++;//释放锁pthread_mutex_unlock(&counter_mutex);return (void *)0;
}int main(int argc, char const *argv[])
{pthread_t pid[THREAD_COUNT];int sum = 0;for (size_t i = 0; i < THREAD_COUNT; i++){//创建的线程的功能是给传入的参数累加1pthread_create(pid+i,NULL,add_thread,&sum);}//等待所有线程结束for (size_t i = 0; i < THREAD_COUNT; i++){pthread_join(pid[i],NULL);}printf("最终结果为 %d\n",sum);return 0;
}
2、读写锁(Read-Write Lock)
①:相关函数
读写锁允许多个线程同时读取共享数据,但只允许一个线程写入共享数据。
相关函数主要包括以下几个:
pthread_rwlock_t
类型:读写锁的类型,在使用前需要进行初始化。
pthread_rwlock_init
函数:用于初始化读写锁。它的原型如下:
int pthread_rwlock_init(pthread_rwlock_t *restrict rwlock, const pthread_rwlockattr_t *restrict attr);
参数rwlock
是指向要初始化的读写锁的指针,attr
是读写锁的属性,可以传递NULL
使用默认属性。函数调用成功时返回0,失败时返回错误码。
下面代码用于静态初始化:
static pthread_rwlock_t rwlock = PTHREAD_RWLOCK_INITIALIZER;
pthread_rwlock_destroy
函数:用于销毁读写锁。它的原型如下:
int pthread_rwlock_destroy(pthread_rwlock_t *rwlock);
参数rwlock
是指向要销毁的读写锁的指针。函数调用成功时返回0,失败时返回错误码。
pthread_rwlock_rdlock
函数:用于获取读锁。它的原型如下:
int pthread_rwlock_rdlock(pthread_rwlock_t *rwlock);
参数rwlock
是指向要获取读锁的读写锁的指针。函数调用成功时返回0,失败时返回错误码。
pthread_rwlock_wrlock
函数:用于获取写锁。它的原型如下:
int pthread_rwlock_wrlock(pthread_rwlock_t *rwlock);
参数rwlock
是指向要获取写锁的读写锁的指针。函数调用成功时返回0,失败时返回错误码。
pthread_rwlock_unlock
函数:用于释放读锁或写锁。它的原型如下:
int pthread_rwlock_unlock(pthread_rwlock_t *rwlock);
参数rwlock
是指向要释放读锁或写锁的读写锁的指针。函数调用成功时返回0,失败时返回错误码。
读写锁的获取和释放应该配对使用,即获取读锁后应该释放读锁,获取写锁后应该释放写锁,否则可能会导致死锁或其它并发问题。
②:用例
通过两个写线程和六个读线程来验证读写锁的作用。
#include <stdio.h>
#include <unistd.h>
#include <pthread.h>//static pthread_rwlock_t rwlock = PTHREAD_RWLOCK_INITIALIZER;静态初始化
pthread_rwlock_t rwlock;
int shared_data = 0;void *lock_reader(void *argv){pthread_rwlock_rdlock(&rwlock);printf("this is %s,value is %d\n",(char *)argv,shared_data);pthread_rwlock_unlock(&rwlock);
}void *lock_writer(void *argv){//给线程添加写锁pthread_rwlock_wrlock(&rwlock);int tep = shared_data + 1;sleep(1);shared_data = tep;printf("this is %s,shared_data++\n",(char *)argv);pthread_rwlock_unlock(&rwlock);
}int main(int argc, char const *argv[])
{// 避免写饥饿,将写优先级设置高于读优先级// pthread_rwlockattr_t attr;// pthread_rwlockattr_init(&attr);// pthread_rwlockattr_setkind_np(&attr,PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP);// pthread_rwlock_init(&rwlock,&attr);//显式初始化pthread_rwlock_init(&rwlock,NULL);pthread_t writer1,writer2,reader1,reader2,reader3,reader4,reader5,reader6;//创建两个写线程pthread_create(&writer1,NULL,lock_writer,"writer1");pthread_create(&writer2,NULL,lock_writer,"writer2");sleep(3);pthread_create(&reader1,NULL,lock_reader,"reader1");pthread_create(&reader2,NULL,lock_reader,"reader2");pthread_create(&reader3,NULL,lock_reader,"reader3");pthread_create(&reader4,NULL,lock_reader,"reader4");pthread_create(&reader5,NULL,lock_reader,"reader5");pthread_create(&reader6,NULL,lock_reader,"reader6");//等待创建的子线程运行完成pthread_join(writer1,NULL);pthread_join(writer2,NULL);pthread_join(reader1,NULL);pthread_join(reader2,NULL);pthread_join(reader3,NULL);pthread_join(reader4,NULL);pthread_join(reader5,NULL);pthread_join(reader6,NULL);//销毁读写锁pthread_rwlock_destroy(&rwlock);return 0;
}
3、自旋锁
自旋锁是一种基于忙等待的同步原语,它使得一个线程在获取锁失败时不会立即阻塞,而是反复地检查锁是否可用,直到获取到锁为止。
自旋锁适用于保护临界区比较短的情况。如果临界区的执行时间比较长,自旋等待会造成CPU资源的浪费。在这种情况下,应该考虑使用其他同步原语,比如互斥锁或条件变量。
4、条件变量(condition variable)
①:相关函数
条件变量是一种用于线程间通信的机制,它可以使一个线程等待,直到另一个线程满足某个条件。
条件变量需要和互斥锁(mutex)一起使用,因为条件变量的等待和唤醒操作需要在互斥锁的保护下进行,以确保线程间的同步和互斥。
相关函数有以下几个:
pthread_cond_init
:用于初始化一个条件变量。函数原型如下:
int pthread_cond_init(pthread_cond_t *cond, pthread_condattr_t *attr);
参数cond
是一个指向条件变量对象的指针,attr
是一个指向条件变量属性的指针(通常可以传入NULL)。
下面代码用于静态初始化:
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_destroy
:用于销毁一个条件变量。函数原型如下:
int pthread_cond_destroy(pthread_cond_t *cond);
参数cond
是一个指向要销毁的条件变量对象的指针。
pthread_cond_wait
:用于使线程等待条件变量满足。函数原型如下:
int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex);
参数cond
是一个指向要等待的条件变量对象的指针,mutex
是一个指向互斥锁对象的指针。调用该函数后,线程会阻塞,直到另一个线程调用pthread_cond_signal()
或pthread_cond_broadcast()
来唤醒等待的线程。
pthread_cond_signal
:用于唤醒一个等待的线程(如果在这个函数调用之前没有调用pthread_cond_wait 函数
,则该函数不会唤醒任何线程)。函数原型如下:
int pthread_cond_signal(pthread_cond_t *cond);
参数cond
是一个指向要唤醒的条件变量对象的指针。调用该函数会唤醒一个处于等待状态的线程。
pthread_cond_broadcast
:用于广播唤醒所有等待的线程。函数原型如下:
int pthread_cond_broadcast(pthread_cond_t *cond);
参数cond
是一个指向要广播唤醒的条件变量对象的指针。调用该函数会唤醒所有处于等待状态的线程。
使用条件变量的一般流程如下:
1、初始化条件变量和互斥锁。
2、在需要等待条件变量满足的线程中,使用pthread_cond_wait()
函数等待条件变量。
3、在满足条件时,通过持有互斥锁来保护共享数据,然后对需要等待的线程进行唤醒:如果只需唤醒一个线程,可以使用pthread_cond_signal()
函数。如果需要唤醒所有等待线程,可以使用pthread_cond_broadcast()
函数。
4、销毁条件变量和互斥锁。
②:用例
该用例创建两个线程,然后在两个线程中分别进行读写操作:写满数据后唤醒读操作,读完数据后唤醒写操作。
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>#define BUFFER_SIZE 5
int buffer[BUFFER_SIZE];
int count = 0;//初始化互斥锁
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
//初始化条件变量
static pthread_mutex_t cond = PTHREAD_COND_INITIALIZER;//读数据
void *consumer(void *argv){pthread_mutex_lock(&mutex);while (1){//获取锁// pthread_mutex_lock(&mutex);if (count == 0){//缓存中没数据,,暂停线程pthread_cond_wait(&cond,&mutex);}printf("接收到的数字为%d\n",buffer[--count]);//唤醒生产者pthread_cond_signal(&cond);//释放锁// pthread_mutex_unlock(&mutex);}pthread_mutex_unlock(&mutex);}
//写数据
void *producer(void *argv){int item = 1;pthread_mutex_lock(&mutex);while (1){//获取锁// pthread_mutex_lock(&mutex);//如果缓冲区满,使用条件变量暂停线程if (count == BUFFER_SIZE){//暂停线程pthread_cond_wait(&cond,&mutex);}//缓存区没满buffer[count++] = item++;printf("发送数字%d\n",buffer[count-1]);//唤醒消费者pthread_cond_signal(&cond);//释放锁// pthread_mutex_unlock(&mutex);}pthread_mutex_unlock(&mutex);
}int main(int argc, char const *argv[])
{//创建读写线程pthread_t producer_thread,consumer_thread;pthread_create(&producer_thread,NULL,producer,NULL);pthread_create(&consumer_thread,NULL,consumer,NULL);//等待两个线程完成pthread_join(producer_thread,NULL);pthread_join(consumer_thread,NULL);return 0;
}
5、信号量(Semaphore)
①:相关函数
信号量可以用于线程间的通信,也可以同于进程间的通信,信号量可以分为有名和无名信号量,与管道类似。主要用来实现互斥和同步。
互斥:确保多个进程或线程不会同时访问临界区。
同步:协调多个进程或线程的执行顺序确保它们按照一定的顺序执行。
相关函数有以下几个:
sem_init:用于初始化一个信号量。
int sem_init(sem_t *sem, int pshared, unsigned int value);
其中,sem
是一个信号量对象的指针,pshared
指定信号量是在进程内共享(1)还是在线程内共享(0),value
是信号量的初始值。
sem_wait:用于获取一个信号量,如果信号量的值大于0,就将其减一;如果信号量的值等于0,调用线程将被阻塞,直到信号量的值大于0为止。
int sem_wait(sem_t *sem);
sem_post:用于释放一个信号量,将信号量的值加一。
int sem_post(sem_t *sem);
sem_destroy:用于销毁一个信号量对象。
int sem_destroy(sem_t *sem);
②:用例
1、线程中使用匿名信号量控制两个线程有顺序的进行读写
#include <stdio.h>
#include <unistd.h>
#include <semaphore.h>
#include <stdlib.h>
#include <time.h>
#include <pthread.h>sem_t *full;
sem_t *empty;int shard_num;int rand_num(){srand(time(NULL));return rand();
}//实现发送读取发送读取依次按序执行
void *producer(void *argv){for (int i = 0; i < 5; i++){//获取信号量sem_wait(empty);printf("\n第%d轮数据传输\n",i + 1);sleep(1);shard_num = rand_num();//释放信号量sem_post(full);}
}void *consumer(void *argv){for (int i = 0; i < 5; i++){//获取信号量sem_wait(full);printf("\n第%d轮数据读取,数据为:%d\n",i + 1,shard_num);sleep(1);//释放信号量sem_post(empty);}
}int main(int argc, char const *argv[])
{//初始化信号量full = malloc(sizeof(sem_t));empty = malloc(sizeof(sem_t));sem_init(empty,0,1);sem_init(full,0,0);//创建生产者,消费者线程pthread_t producer_id,consumer_id;pthread_create(&producer_id,NULL,producer,NULL);pthread_create(&producer_id,NULL,consumer,NULL);//等待线程全部执行完成pthread_join(producer_id,NULL);pthread_join(consumer_id,NULL);//摧毁信号量sem_destroy(empty);sem_destroy(full);return 0;
}
2、进程中使用匿名信号量控制子进程先于父进程运行。(在进程中通讯信号量需创建在共享内存中)
#include <pthread.h>
#include <stdio.h>
#include <unistd.h>
#include <fcntl.h>
#include <semaphore.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <sys/wait.h>int main(int argc, char const *argv[])
{char *shm_name = "unnamed_sem_shm";//创建内存共享对象int fd = shm_open(shm_name,O_CREAT|O_RDWR,0666);//调整大小ftruncate(fd,sizeof(sem_t));//映射到内存区sem_t *sem = mmap(NULL,sizeof(sem_t),PROT_READ|PROT_WRITE,MAP_SHARED,fd,0);//初始化信号量sem_init(sem,1,0);pid_t pid = fork();if (pid < 0){perror("fork");}else if (pid == 0){sleep(1);//睡眠1s保证父进程先运行printf("this is son\n");sem_post(sem);//sem+1释放信号量,父进程被唤醒}else{sem_wait(sem);//因为sem=0,会阻塞printf("this is father\n");waitpid(pid,NULL,0);}//摧毁信号量// 父进程执行到此处子进程已执行完毕可以销毁信号量// 子进程执行到此处父进程仍在等待信号量此时销毁会导致未定义行为if (pid > 0){if(sem_destroy(sem) == -1){perror("sem_destroy");}}// 父子进程都应该解除映射关闭文件描述符if (munmap(sem, sizeof(sem_t)) == -1){perror("munmap");}if (close(fd) == -1){perror("close");}// shm_unlink 只能调用一次只在父进程中调用if (pid > 0){if (shm_unlink(shm_name) == -1){perror("father shm_unlink");}}return 0;
}
3、在进程中使用有名信号量实现类似互斥锁的功能
其中:shm_open;sem_close;sem_unlink 是有名信号量需要使用到的特殊函数。
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <semaphore.h>int main(int argc, char const *argv[])
{char shm_value_name[100]={0};sprintf(shm_value_name,"/value%d",getpid());char *sem_name = "/named_sem_shm";// 1.创建共享内存对象int value_fd = shm_open(shm_value_name,O_RDWR | O_CREAT,0644);//初始化有名信号量sem_t *sem = sem_open(sem_name,O_CREAT,0666,1);if (value_fd < 0){perror("shm_open");exit(EXIT_FAILURE);}// 2.设置共享内存对象大小ftruncate(value_fd,sizeof(int));// 3.内存映射int *share = mmap(NULL,sizeof(int),PROT_READ|PROT_WRITE,MAP_SHARED,value_fd,0);if (share == MAP_FAILED){perror("mmap");exit(EXIT_FAILURE);}// 初始化共享变量的值*share = 0;// 4.使用内存映射实现进程间的通讯pid_t pid = fork();if (pid < 0){perror("fork");exit(EXIT_FAILURE);}else if (pid == 0){sem_wait(sem);int tmp = *share + 1;sleep(1);*share = tmp;sem_post(sem);}else{sem_wait(sem);int tmp = *share + 1;sleep(1);*share = tmp;sem_post(sem);// 等待子进程结束waitpid(pid,NULL,0);printf("the final share is %d\n",*share); }// 5.释放映射区,父子都释放if (munmap(share,sizeof(int)) == -1){perror("munmap");}if(close(value_fd) == -1){perror("close_value");}if (sem_close(sem) == -1){perror("sem_close");}// 6.释放共享内存对象if (pid > 0){if (shm_unlink(shm_value_name) == -1){perror("value_unlink");}if (sem_unlink(sem_name) == -1){perror("value_unlink");}}return 0;
}
四、线程池
1、相关函数
线程池是一种管理线程的机制,可以用于控制并发任务的数量。线程池可以提高程序的性能和吞吐量,同时减少线程的创建和销毁的开销。线程池通常由一个固定数量的线程组成,并使用任务队列来存储待执行的任务。
要使用线程池,可以使用GLib库中的线程池相关函数。GLib是Gnome项目的核心库,提供了一组丰富的功能和工具。
以下是使用GLib库中线程池相关函数的一般步骤:
g_thread_pool_new 创建线程池:
GThreadPool *pool = g_thread_pool_new(worker_func, user_data, max_threads, exclusive);
worker_func
是线程池中线程的任务函数,用于执行具体的工作。
user_data
是传递给任务函数的用户数据。
max_threads
是线程池中允许的最大线程数。
exclusive
指定线程池是否为独占模式。
g_thread_pool_push 添加任务到线程池:
g_thread_pool_push(pool, task_data, NULL);
task_data
是要执行的任务数据。
g_thread_pool_wait 等待任务完成:
g_thread_pool_wait(pool);
g_thread_pool_free 销毁线程池:
g_thread_pool_free(pool, TRUE, TRUE);
第一个参数是线程池对象。
第二个参数指定是否等待所有任务完成后再销毁线程池。
第三个参数指定是否等待所有线程完成后再销毁线程池。
2、用例
该用例创建10个任务并发运行,因为设置线程池允许最大线程数为5,所以运行结果应该是:先创建五个线程,然后等一个线程运行结束后再创建另一个线程,也就是说最多只有五个线程在同时运行。
#include <glib.h>
#include <stdio.h>
#include <unistd.h>
#include <stdlib.h>void task_func(gpointer data,gpointer user_data){int task_num = *(int *)data;free(data);printf("第 %d 个任务开始执行\n",task_num);sleep(1);printf("第 %d 个任务执行完成\n",task_num);
}int main(int argc, char const *argv[])
{//创建线程池GThreadPool *thread_pool = g_thread_pool_new(task_func,NULL,5,TRUE,NULL);//添加任务for (int i = 0; i < 10; i++){int *tmp = malloc(sizeof(int));*tmp = i+1;g_thread_pool_push(thread_pool,tmp,NULL);}//等待所有任务完成g_thread_pool_free(thread_pool,FALSE,TRUE);printf("All task completed\n");return 0;
}
要运行含 glib.h 的函数需要先安装该库
sudo apt-get update
sudo apt-get install libglib2.0-dev
然后在Makefile文件中链接该库
thread_pool_test: thread_pool_test.c-$(CC) -o $@ $^ `pkg-config --cflags --libs glib-2.0`-./$@-rm ./$@