一、什么是 POSIX 信号量
信号量本质就是一个统计资源数量的计数器。
1、PV 操作
pv操作就是一种让信号量变化的操作。其中 P 操作可以让信号量减 1(如果信号量大于 0),V 操作可以让信号量加 1.
2、信号量类型——sem_t
3、相关函数
3.1. 初始化信号量
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
pshared: 0表示线程间共享,非零表示进程间共享
value:信号量初始值
3.2. 销毁信号量
int sem_destroy(sem_t *sem);
3.3. P 操作
int sem_wait(sem_t *sem);
3.4. V 操作
int sem_post(sem_t *sem);
二、环形生产者消费者模型
1、原理
对于这个模型有一个规则:
- 只能有一个消费者和一个生产者访问这个环形内存;不能多个消费者和多个生产者同时访问这个环形内存。
- 消费者不能超过生产者,也不能套生产者的圈。
- 生产者不能套消费者的圈。
2、代码
#pragma once#include <vector>
#include <semaphore.h>
#include <pthread.h>const int default_cap = 5;template<class T>
class ring_queue
{
private:void P(sem_t& sem) { sem_wait(&sem); }void V(sem_t& sem) { sem_post(&sem); }void Lock(pthread_mutex_t& mutex) { pthread_mutex_lock(&mutex); }void UnLock(pthread_mutex_t& mutex) { pthread_mutex_unlock(&mutex); }
public:ring_queue(int capacity = default_cap) : _capacity(capacity), _arr(std::vector<T>(capacity)){sem_init(&_sem_consumer, 0, 0);sem_init(&_sem_producer, 0, _capacity);pthread_mutex_init(&_mutex_consumer, nullptr);pthread_mutex_init(&_mutex_producer, nullptr);}~ring_queue(){sem_destroy(&_sem_consumer), sem_destroy(&_sem_producer);pthread_mutex_destroy(&_mutex_consumer), pthread_mutex_destroy(&_mutex_producer);}void push(const T& in){P(_sem_producer); // pv 操作一步到位,不会被中断,因此可以保证结果正确Lock(_mutex_producer); // _index_producer 下标互斥_arr[_index_producer] = in;_index_producer = (_index_producer + 1) % _capacity;UnLock(_mutex_producer);V(_sem_consumer);}T pop(){P(_sem_consumer); // pv 操作一步到位,不会被中断,因此可以保证结果正确Lock(_mutex_consumer); // _index_consumer 下标互斥T out = _arr[_index_consumer];_index_consumer = (_index_consumer + 1) % _capacity;UnLock(_mutex_consumer);V(_sem_producer);return out;}
private:std::vector<T> _arr;int _capacity;int _index_consumer, _index_producer;sem_t _sem_consumer, _sem_producer;pthread_mutex_t _mutex_consumer, _mutex_producer;
};
三、线程池
1、结构
因为中间的共享内存是被写任务的线程和拿任务的线程共享,因此我们可以把线程池看作是多生产者和多消费者的 CP 模型。
四、线程安全单例模式
1、什么是单例模式
只能建立一个对象的设计模式。
2、单例模式类型
2.1. 懒汉模式
获取实例的时候才开辟空间并初始化。
2.2. 饿汉模式
获取实例前就已经开辟空间并初始化好了。
3、代码(懒汉版)
#ifndef __THREAD_POOL_HPP__
#define __THREAD_POOL_HPP__#include <vector>
#include <queue>
#include <pthread.h>const int default_cap = 5;class thread_info
{pthread_t _tid;std::string name;
};template<class task>
class thread_pool
{
private:void lock() { pthread_mutex_lock(&_mutex); }void unlock() { pthread_mutex_unlock(&_mutex); }~thread_pool() { pthread_mutex_destroy(&_mutex); pthread_cond_destroy(&_cond); }thread_pool(int capacity = default_cap) : _capacity(capacity), _tasks(std::vector<task>(capacity)){pthread_mutex_init(&_mutex, nullptr);pthread_cond_init(&_cond, nullptr);}thread_pool(const thread_pool<task>& pool) = delete;thread_pool<task>& operator=(const thread_pool<task>& pool) = delete;
public:static thread_pool<task>* get_instance(){if (pt == nullptr) // 第一次申请单例时才要考虑多线程互斥问题,因为后面的 _pt 都不为空了,因此不用判断 _pt 是否为空{pthread_mutex_lock(&mutex);if (pt != nullptr) return pt;pthread_mutex_unlock(&mutex);thread_pool<task>* ret = new thread_pool<task>;}return ret;}// pthread_create 要求的函数是 void* start_routine(void* args),而如果不加 static,则为 void* start_routine(thread_pool<task>* this, void* args)static void* start_routine(void* args) {thread_pool<task>* obj = static_cast<thread_pool<task>*>(args);obj->lock();while ((obj->_tasks).empty()) pthread_cond_wait(&_cond, &_mutex);task t = obj->pop();obj->unlock();// run treturn t;}void start(){for (int i = 0; i < _capacity; i++)_threads[i].name = "thread - " + std::to_string(i),pthread_create(&(_threads[i]._tid), nullptr, start_routine, this);}void push(const task& in){lock();_tasks.push(in);pthread_cond_signal(&_cond);unlock();}task pop() // 可保证在调 pop 方法时只有一个线程在调{task out = _tasks.front();_tasks.pop();return out;}
private:std::vector<thread_info> _threads;std::queue<task> _tasks;int _capacity;pthread_mutex_t _mutex; // 抢任务执行pthread_cond_t _cond; // 消费者的阻塞队列static thread_pool<task>* pt;static pthread_mutex_t mutex;
};template<class task>
thread_pool<task>* thread_pool<task>::pt = nullptr;template<class task>
pthread_mutex_t thread_pool<task>::mutex = PTHREAD_MUTEX_INITIALIZER;#endif