目录
- `🍁POSIX信号量 `
- `🍁信号量的相关接口介绍`
- *初始化信号量*
- *销毁信号量*
- *等待信号量*
- *发布信号量*
- `🍁(POSIX信号量):基于环形队列的生产消费模型`
- `🍁基于环形队列的生产消费模型代码实现`
🍁POSIX信号量
POSIX信号量和SystemV信号量作用相同,都是用于同步
操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步
。
🍁信号量的相关接口介绍
初始化信号量
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
- 参数:
- pshared:0表示线程间共享,非零表示进程间共享
- value:信号量初始值
销毁信号量
int sem_destroy(sem_t *sem);
等待信号量
- 功能:等待信号量,会将信号量的值减1
int sem_wait(sem_t *sem); //P()
发布信号量
- 功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);//V()
注意:
申请成功后,不需要判断资源是否具备条件,因为申请成功后,就一定有对应的资源提供给你,从而可以有效减少内部判断
(与条件变量相比,代码上少很多的判断
)。
🍁(POSIX信号量):基于环形队列的生产消费模型
上一节生产者-消费者的例子是基于阻塞队列实现的,其空间可以动态分配,现在基于固定大小的环形队列重写这个程序
环形队列采用数组模拟,用模运算来模拟环状特性
环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态。
但是我们现在有信号量这个计数器,就很简单的进行多线程间的同步过程。
🍁基于环形队列的生产消费模型代码实现
实现思想:
定义两个下标,记录生产者和消费者的位置,便于在指定下标下生产和执行任务,定义两个信号量,生产者和消费者各一个,记录自己的资源,定义两个锁,生产者消费者各一个,为了让生产者之间与消费者之间实现互斥。
伪代码:
main.cc
#include <iostream>
#include "thread.hpp"
#include "ringqueue.hpp"
#include <vector>
#include <string>using namespace std;
using namespace ThreadModule;void consumer(Ring_Queue<int> &rq)
{sleep(1);while (true){sleep(2);int data = 0;rq.pop(&data);cout << "consumer consume data is:" << data << endl;}
}void product(Ring_Queue<int> &rq)
{sleep(1);int cnt = 10;while (true){rq.Enqueue(cnt); //让数字模拟任务cout << "productor product data is:" << cnt++ << endl;}
}void InitConsumer(vector<Thread<Ring_Queue<int>>> *threads, int n, Ring_Queue<int> &rq)
{for (int i = 0; i < n; i++){string name = "comsumer -" + to_string(i + 1);threads->emplace_back(consumer, rq, name);}
}void Initproductor(vector<Thread<Ring_Queue<int>>> *threads, int n, Ring_Queue<int> &rq)
{for (int i = 0; i < n; i++){string name = "product -" + to_string(i + 1);threads->emplace_back(product, rq, name);}
}void StartAll(vector<Thread<Ring_Queue<int>>> &threads)
{for (auto &thread : threads){thread.start();}
}void waitAllthread(vector<Thread<Ring_Queue<int>>> &threads)
{for (auto &thread : threads){thread.join();}
}int main()
{Ring_Queue<int> *rq = new Ring_Queue<int>(5);vector<Thread<Ring_Queue<int>>> threads; InitConsumer(&threads, 3, *rq); //初始化消费者线程(创建)Initproductor(&threads, 1, *rq); //初始化生产者线程(创建)StartAll(threads); //启动所有的线程waitAllthread(threads);return 0;
}
ringqueue.hpp
#pragma
#include <iostream>
#include <vector>
#include <semaphore.h>
#include <pthread.h>using namespace std;
template <class T>
class Ring_Queue
{
public://封装PV操作void P(sem_t *p){sem_wait(p);}void V(sem_t *v){sem_post(v);}public:Ring_Queue(int cap) : _ring_queue(cap), _cap(cap), _consumer_dex(0), _product_dex(0){sem_init(&_room, 0, _cap);sem_init(&_data, 0, 0);pthread_mutex_init(&_consumer_mutex, nullptr);pthread_mutex_init(&_product_mutex, nullptr);}void Enqueue(const T &in){//申请信号量,如果申请成功,则说明有对应的资源,内部不用判断P(&_room);//加锁,目的:让生产者之间互斥pthread_mutex_lock(&_product_mutex);// 生产_ring_queue[_product_dex++] = in;_product_dex %= _cap;pthread_mutex_unlock(&_product_mutex);V(&_data);}void pop(T *out){P(&_data);//目的:让消费者之间互斥pthread_mutex_lock(&_consumer_mutex);// 消费*out = _ring_queue[_consumer_dex++];_consumer_dex %= _cap;pthread_mutex_unlock(&_consumer_mutex);V(&_room);}~Ring_Queue(){sem_destroy(&_room);sem_destroy(&_data);pthread_mutex_destroy(&_consumer_mutex);pthread_mutex_destroy(&_product_mutex);}private:// 1.环形队列vector<T> _ring_queue; //循环队列int _cap; //队列总容量int _consumer_dex; //消费者的下标int _product_dex; //生产者的下标sem_t _room; //生产者的信号量sem_t _data; //消费者的信号量pthread_mutex_t _consumer_mutex; //消费者间的锁pthread_mutex_t _product_mutex; //生产者间的锁
};
thread.hpp
#ifndef __THREAD_HPP__
#define __THREAD_HPP__#include <iostream>
#include <unistd.h>
#include <functional>
#include <string>namespace ThreadModule
{template <class T>using func_t = std::function<void(T &)>;template <class T>class Thread{public:Thread(func_t<T> func, T &data, std::string &name): _func(func), _data(data), _name(name), _stop(true){}void execute(){_func(_data);}static void *threadrun(void *args){Thread<T> *self = static_cast<Thread<T> *>(args);self->execute();return nullptr;}bool start(){int n = pthread_create(&_tid, nullptr, threadrun, this);if (!n){_stop = false;return true;}else{return false;}}void stop(){_stop = true;}void detach(){if (!_stop)pthread_detach(_tid);}std::string name(){return _name;}void join(){if (!_stop)pthread_join(_tid, nullptr);}~Thread() {}private:pthread_t _tid;func_t<T> _func;T &_data;std::string _name;bool _stop;};
}#endif