【Linux】多线程 -> 线程同步与基于BlockingQueue的生产者消费者模型-CSDN博客
我们先发现之前写的基于BlockingQueue的生产者消费者模型有什么不足的地方?
- 一个线程,在操作临界资源的时候,必须是满足条件的。
- 可是,临界资源是否满足生产或消费的条件,我们无法直接得知。(在没有访问临界资源之前,无法得知)
- 只能先加锁,再检测;再操作,再解锁。
我们在操作临界资源的时候,有可能不就绪,但是我们无法提前得知,所以,只能先加锁,再检测,根据检测结果,决定下一步操作。
- 只要我们对资源进行整体加锁,就默认了我们对这个资源的整体使用。
- 实际情况可能存在:一份公共资源,允许多个线程同时访问不同的区域!
每一次在进入临界区访问临界资源时,永远都是先检测资源是否就绪,检测过程必须得访问临界资源,前提就需要加锁,所以永远都是先加锁再判断再解锁,我们无法再访问之前得知资源的使用情况。信号量就将检测资源状态放在了申请资源之前进行。并且,一份公共资源,允许多个线程同时访问不同区域(程序员编码保证不同的线程可以并发的访问公共资源的不同区域)的场景下,就可以使用信号量。
POSIX信号量
POSIX信号量和SystemV信号量作用相同,都是用于同步操作,达到无冲突的访问共享资源目的。 但POSIX可以用于线程间同步。
- 信号量本质就是一个计数器,衡量临界资源数量多少的计数器。
- 信号量的值代表可用资源的数量,线程或进程可以通过特定操作来改变信号量的值,以此实现对共享资源的同步访问。
- 只要拥有信号量,就在未来一定能拥有临界资源的一部分。
- 申请信号量的本质:对临界资源中特定的小块资源做预订机制。
所以,在访问临界资源之前,就可以通过申请信号量的方式来得知临界资源的使用情况。确认资源有没有就绪。申请成功,就有“你”的资源;申请失败,就说明条件不就绪,没有“你”的资源,只能等条件就绪。
计数器 递减or递增 伪代码:
sem_t sem = 10;
sem--; --- 申请资源 --- 必须保证操作的原子性 --- P操作(能保证原子性)
sem++; --- 归还资源 --- 必须保证操作的原子性 --- V操作(能保证原子性)
信号量核心操作:PV原语
信号量相关操作
初始化信号量
#include <semaphore.h>
int sem_init(sem_t *sem, int pshared, unsigned int value);
参数:
pshared:0表示线程间共享,非0表示进程间共享
value:信号量初始值
销毁信号量
int sem_destroy(sem_t *sem);
等待信号量
功能:等待信号量,会将信号量的值减1。
int sem_wait(sem_t *sem); //P()
发布信号量
功能:发布信号量,表示资源使用完毕,可以归还资源了。将信号量值加1。
int sem_post(sem_t *sem);//V()
基于RingQueue的生产者消费者模型
上一文中,我们已经写了基于BlockingQueue的生产者消费者模型,其空间可以动态分配。现在基于固定大小的环形队列实现一个生产者消费者模型。
- 环形队列采用数组模拟,用模运算来模拟环状特性。
环形结构起始状态和结束状态都是一样的,不好判断为空或者为满,所以可以通过加计数器或者标记位来判断满或者空。另外也可以预留一个空的位置,作为满的状态。
![]()
但是我们现在有信号量这个计数器,就很简单的进行多线程间的同步过程。
- 生产者和消费者什么情况下可能会访问同一位置?
- 空的时候。
- 满的时候。
- 其他情况下,生产者和消费者访问的就不是不同位置!
- 为了完成环形队列的生产者消费者问题,核心工作是什么?
- 生产者不能超过消费者,反之也是。
- 生产者不能把消费者“套一个圈”以上,反之也是。
- 生产者和消费者什么时候会在同样位置呢?a.为空 b.为满。
伪代码:
-
对于生产者而言,生产者关注的是什么?队列中的剩余空间-- 空间资源定义一个信号量。
-
对于消费者而言,消费者关注的是什么?放入队列中的数据-- 数据资源定义一个信号量。
在环形队列中,大部分情况下,单生产者和单消费者是可以并发执行的!只有在队列为空或为满的时候,才有互斥与同步问题!
未来,生产者和消费者的位置我们要想清楚:
- 其实就是队列中的下标
- 一定是两个下标
- 队列为空或者为满,下标相同
通过下标让生产者和消费者访问不同的位置。也可以通过下标给线程指派它要访问的资源。
下面我们以单生产者和单消费者为例实现基于RingQueue的生产者消费者模型。
makefile:
ringqueue:main.ccg++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:rm -f ringqueue
RingQueue.hpp:
#pragma once#include <iostream>
#include <vector>
#include <semaphore.h>
#include <cassert>static const int gmaxcap = 5;template <class T>
class RingQueue
{
public:RingQueue(const int &cap = gmaxcap): _queue(cap), _cap(cap){int n = sem_init(&_spaceSem, 0, _cap);assert(n == 0);n = sem_init(&_dataSem, 0, 0);assert(n == 0);_producerStep = _consumerStep = 0;}void P(sem_t &sem){int n = sem_wait(&sem);assert(n == 0);//if(void)n;}void V(sem_t &sem){int n = sem_post(&sem);assert(n == 0);(void)n;}//生产者void push(const T &in){P(_spaceSem); // 申请到了空间信号量,意味着,生产者可以正常生产。_queue[_producerStep++] = in;_producerStep %= _cap;V(_dataSem);}//消费者void pop(T *out){P(_dataSem); // 申请到了数据信号量,意味着,消费者可以正常消费*out = _queue[_consumerStep++];_consumerStep %= _cap;V(_spaceSem);}~RingQueue(){sem_destroy(&_spaceSem);sem_destroy(&_dataSem);}private:std::vector<int> _queue; // 数组模拟环形队列int _cap; // 环形队列容量sem_t _spaceSem; // 生产者,想生产,看重的是空间资源sem_t _dataSem; // 消费者,想消费,看重的是数据资源int _producerStep; // 生产数据的下标int _consumerStep; // 消费数据的下标
};
main.cc:
#include "RingQueue.hpp"
#include <iostream>
#include <cstdlib>
#include <ctime>
#include <pthread.h>
#include <sys/types.h>
#include <unistd.h>void *ProducerRoutine(void *rq)
{RingQueue<int> *ringqueue = static_cast<RingQueue<int> *>(rq);while (true){int data = rand() % 10 + 1;ringqueue->push(data);std::cout << "生产完成,生产的数据是:" << data << std::endl; // 和阻塞队列不同的是,如果生产和消费的不是同一个位置,生产和消费是可以同时并发执行的。sleep(1);}
}
void *ConsumerRoutine(void *rq)
{RingQueue<int> *ringqueue = static_cast<RingQueue<int> *>(rq);while (true){int data;ringqueue->pop(&data);std::cout << "消费完成,消费的数据是:" << data << std::endl;//sleep(1);}
}int main()
{srand((unsigned int)time(nullptr) ^ getpid());RingQueue<int> *rq = new RingQueue<int>();pthread_t p, c;pthread_create(&p, nullptr, ProducerRoutine, rq);pthread_create(&p, nullptr, ConsumerRoutine, rq);pthread_join(p, nullptr);pthread_join(c, nullptr);return 0;
}
这样,我们就完成了基于RingQueue环形队列的单生产者单消费者模型。
当在环形队列里进行生产消费活动时,生产者和消费者只有在队列空或者为满时才访问同一个位置,其他情况都是指向不同位置,只要分配合理,就可以实现一定程度的生产和消费的并发。
当为空为满,生产者和消费者指向同一个位置时,生产消费的行为呈现出非常强烈的互斥(只能有一个访问)和同步(为空时,生产者先访问;为满时,消费者先访问)。
下面我们来分析一下各种情况:
- 队列为空时
producer_sem为10,consumer_sem为0。生产者和消费者谁先运行并不确定。如果消费者先运行,先申请信号量,但是此时,consumer_sem为0,P(consumer_sem)申请信号量就不成功,消费者立即就会被阻塞挂起,等待生产者生产。
也就是环形队列为空的时候,生产者消费者谁先运行并不确定,消费者不会进入临界资源里访问,只能等待生产者先访问,也就是他们两个是按照一定顺序执行的。
- 如果生产者一直生产,消费者不消费:可不可能生产者生产完了,会超过消费者呢?
生产者一直生产,消费者不消费。生产者一直P(producer_sem)申请信号量,直到最后producer_sem为0,环形队列为满的时候,生产者再生产,去P(producer_sem)的时候,就申请不到了,生产者就会在申请信号量处将自己阻塞挂起,必须得等待消费者来消费。所以,生产者无法把消费者“套一个圈”。
- 队列为满时
producer_sem为0,consumer_sem为10。生产者和消费者谁先运行并不确定。如果生产者先被调度,先申请信号量,但是此时,producer_sem已经为0,P(producer_sem)申请信号量就不成功,生产者立即就被阻塞挂起了,等待消费者消费。
也就是环形队列为满的时候,生产者消费者谁先运行并不确定,生产者不会进入临界资源里访问,只能等待消费者先访问,也就是他们两个是按照一定顺序执行的。
- 如果消费者一直消费,生产者不生产:可不可能消费者消费完了,会超过生产者呢?
消费者一直消费,生产者不生产。消费者一直P(consumer_sem)申请信号量,直到最后consumber_sem为0,环形队列为空的时候,消费者再消费,去P(consumer_sem)的时候,就申请不到了,消费者就在申请信号量处将自己阻塞挂起,必须得等生产者来生产。所以,消费者无法把生产者“套一个圈”。
main.cc:
按照上述情况分析,让生产者sleep1秒,消费者一直消费,最终结果就会是生产者生产一个,消费者消费一个。
void *ProducerRoutine(void *rq)
{RingQueue<int> *ringqueue = static_cast<RingQueue<int> *>(rq);while (true){int data = rand() % 10 + 1;ringqueue->push(data);std::cout << "生产完成,生产的数据是:" << data << std::endl; // 和阻塞队列不同的是,如果生产和消费的不是同一个位置,生产和消费是可以同时并发执行的。sleep(1);}
}
void *ConsumerRoutine(void *rq)
{RingQueue<int> *ringqueue = static_cast<RingQueue<int> *>(rq);while (true){int data;ringqueue->pop(&data);std::cout << "消费完成,消费的数据是:" << data << std::endl;//sleep(1);}
}
main.cc:
按照上述情况分析,让消费者sleep1秒,生产者一直生产,最终结果就会是消费者消费一个,生产者生产一个。
void *ProducerRoutine(void *rq)
{RingQueue<int> *ringqueue = static_cast<RingQueue<int> *>(rq);while (true){int data = rand() % 10 + 1;ringqueue->push(data);std::cout << "生产完成,生产的数据是:" << data << std::endl; // 和阻塞队列不同的是,如果生产和消费的不是同一个位置,生产和消费是可以同时并发执行的。//sleep(1);}
}
void *ConsumerRoutine(void *rq)
{RingQueue<int> *ringqueue = static_cast<RingQueue<int> *>(rq);while (true){int data;ringqueue->pop(&data);std::cout << "消费完成,消费的数据是:" << data << std::endl;sleep(1);}
}
同样,我们也可以实现让生产者给消费者派发任务。生产者给消费者派发任务,消费者执行任务。
makefile.cc:
ringqueue:main.ccg++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:rm -f ringqueue
RingQueue.hpp:
#pragma once
#include "Task.hpp"#include <iostream>
#include <vector>
#include <semaphore.h>
#include <cassert>static const int gmaxcap = 5;template <class T>
class RingQueue
{
public:RingQueue(const int &cap = gmaxcap): _queue(cap), _cap(cap){int n = sem_init(&_spaceSem, 0, _cap);assert(n == 0);n = sem_init(&_dataSem, 0, 0);assert(n == 0);_producerStep = _consumerStep = 0;}void P(sem_t &sem){int n = sem_wait(&sem);assert(n == 0); // if(void)n;}void V(sem_t &sem){int n = sem_post(&sem);assert(n == 0);(void)n;}// 生产者void Push(const T &in){P(_spaceSem); // 申请到了空间信号量,意味着,生产者可以正常生产。_queue[_producerStep++] = in;_producerStep %= _cap;V(_dataSem);}// 消费者void Pop(T *out){P(_dataSem); // 申请到了数据信号量,意味着,消费者可以正常消费*out = _queue[_consumerStep++];_consumerStep %= _cap;V(_spaceSem);}~RingQueue(){sem_destroy(&_spaceSem);sem_destroy(&_dataSem);}private:std::vector<T> _queue; // 数组模拟环形队列int _cap; // 环形队列容量sem_t _spaceSem; // 生产者,想生产,看重的是空间资源sem_t _dataSem; // 消费者,想消费,看重的是数据资源int _producerStep; // 生产数据的下标int _consumerStep; // 消费数据的下标
};
Task.hpp:
#pragma once#include <iostream>
#include <cstdio>
#include <string>
#include <functional>class Task
{using func_t = std::function<int(int, int, char)>;// typedef std::function<int(int,int,char)>func_t;
public:Task(){}Task(int x, int y, char op, func_t func): _x(x), _y(y), _op(op), _callback(func){}std::string operator()(){int result = _callback(_x, _y, _op);char buffer[1024];snprintf(buffer, sizeof buffer, "%d %c %d = %d", _x, _op, _y, result);return buffer;}std::string toTaskString(){char buffer[1024];snprintf(buffer, sizeof buffer, "%d %c %d = ?", _x, _op, _y);return buffer;}private:int _x;int _y;char _op;func_t _callback;
};const std::string oper = "+-*/%";int mymath(int x, int y, char op)
{int result = 0;switch (op){case '+':result = x + y;break;case '-':result = x - y;break;case '*':result = x * y;break;case '/':{if (y == 0){std::cerr << "div zero error!" << std::endl;result = -1;}else{result = x / y;}}break;case '%':{if (y == 0){std::cerr << "mod zero error!" << std::endl;result = -1;}else{result = x % y;}}break;default:break;}return result;
}
main.cc:
#include "RingQueue.hpp"
#include "Task.hpp"
#include <iostream>
#include <cstdlib>
#include <ctime>
#include <pthread.h>
#include <sys/types.h>
#include <unistd.h>void *ProducerRoutine(void *rq)
{// RingQueue<int> *ringqueue = static_cast<RingQueue<int> *>(rq);RingQueue<Task> *ringqueue = static_cast<RingQueue<Task> *>(rq);while (true){// version1// int data = rand() % 10 + 1;// ringqueue->push(data);// std::cout << "生产完成,生产的数据是:" << data << std::endl; // 和阻塞队列不同的是,如果生产和消费的不是同一个位置,生产和消费是可以同时并发执行的。// sleep(1);// version2// 构建or获取任务int x = rand() % 100 + 1;int y = rand() % 10;char op = oper[rand() % oper.size()];// 生产任务Task t(x, y, op, mymath);ringqueue->Push(t);// 输出提示std::cout << "生产者派发了一个任务:" << t.toTaskString() << std::endl;// sleep(1);}
}
void *ConsumerRoutine(void *rq)
{// RingQueue<int> *ringqueue = static_cast<RingQueue<int> *>(rq);RingQueue<Task> *ringqueue = static_cast<RingQueue<Task> *>(rq);while (true){// version1// int data;// ringqueue->pop(&data);// std::cout << "消费完成,消费的数据是:" << data << std::endl;// version2Task t;ringqueue->Pop(&t);std::string result = t();std::cout << "消费者消费了一个任务:" << result << std::endl;// sleep(1);}
}int main()
{srand((unsigned int)time(nullptr) ^ getpid());// RingQueue<int> *rq = new RingQueue<int>();RingQueue<Task> *rq = new RingQueue<Task>();pthread_t p, c;pthread_create(&p, nullptr, ProducerRoutine, rq);pthread_create(&p, nullptr, ConsumerRoutine, rq);pthread_join(p, nullptr);pthread_join(c, nullptr);return 0;
}
生产者生产慢,消费者消费快: 生产者生产快,消费者消费慢:
消费者不关心任务是什么,这也是底层的设计和未来的业务做解耦。
上面我们写的是单生产单消费的生产者消费者模型,生产者消费者模型“321原则”只维护了生产者和消费者的互斥与同步关系。如果是多生产者、多消费者呢?生产者和生产者之间的互斥,消费者和消费者之间的互斥关系呢?并没有维护。
基于RingQueue的多生产者多消费者模型
- 如果我们要把上面的代码改成多生产者、多消费者呢?
很简单。只要保证,最终进入环形队列的是一个生产者,一个消费者就可以了!让生产者和生产者竞争,消费者和消费者竞争。
makefile:
ringqueue:main.ccg++ -o $@ $^ -std=c++11 -lpthread
.PHONY:clean
clean:rm -f ringqueue
RingQueue.hpp:
#pragma once
#include "Task.hpp"#include <iostream>
#include <vector>
#include <semaphore.h>
#include <cassert>
#include <pthread.h>static const int gmaxcap = 5;template <class T>
class RingQueue
{
public:RingQueue(const int &cap = gmaxcap): _queue(cap), _cap(cap){int n = sem_init(&_spaceSem, 0, _cap);assert(n == 0);n = sem_init(&_dataSem, 0, 0);assert(n == 0);_producerStep = _consumerStep = 0;pthread_mutex_init(&_pmutex, nullptr);pthread_mutex_init(&_cmutex, nullptr);}void P(sem_t &sem){int n = sem_wait(&sem);assert(n == 0); // if(void)n;}void V(sem_t &sem){int n = sem_post(&sem);assert(n == 0);(void)n;}// 生产者void Push(const T &in){// 先加锁,后申请信号量,还是先申请信号量,后加锁?// pthread_mutex_lock(&_pmutex);P(_spaceSem); // 申请到了空间信号量,意味着,生产者可以正常生产。pthread_mutex_lock(&_pmutex);_queue[_producerStep++] = in;_producerStep %= _cap;V(_dataSem);pthread_mutex_unlock(&_pmutex);}// 消费者void Pop(T *out){// pthread_mutex_lock(&_cmutex);P(_dataSem); // 申请到了数据信号量,意味着,消费者可以正常消费pthread_mutex_lock(&_cmutex);*out = _queue[_consumerStep++];_consumerStep %= _cap;V(_spaceSem);pthread_mutex_unlock(&_cmutex);}~RingQueue(){sem_destroy(&_spaceSem);sem_destroy(&_dataSem);pthread_mutex_destroy(&_pmutex);pthread_mutex_destroy(&_cmutex);}private:std::vector<T> _queue; // 数组模拟环形队列int _cap; // 环形队列容量sem_t _spaceSem; // 生产者,想生产,看重的是空间资源sem_t _dataSem; // 消费者,想消费,看重的是数据资源int _producerStep; // 生产数据的下标int _consumerStep; // 消费数据的下标pthread_mutex_t _pmutex; // 生产者的锁pthread_mutex_t _cmutex; // 消费者的锁
};
Task.hpp:
#pragma once#include <iostream>
#include <cstdio>
#include <string>
#include <functional>class Task
{using func_t = std::function<int(int, int, char)>;// typedef std::function<int(int,int,char)>func_t;
public:Task(){}Task(int x, int y, char op, func_t func): _x(x), _y(y), _op(op), _callback(func){}std::string operator()(){int result = _callback(_x, _y, _op);char buffer[1024];snprintf(buffer, sizeof buffer, "%d %c %d = %d", _x, _op, _y, result);return buffer;}std::string toTaskString(){char buffer[1024];snprintf(buffer, sizeof buffer, "%d %c %d = ?", _x, _op, _y);return buffer;}private:int _x;int _y;char _op;func_t _callback;
};const std::string oper = "+-*/%";int mymath(int x, int y, char op)
{int result = 0;switch (op){case '+':result = x + y;break;case '-':result = x - y;break;case '*':result = x * y;break;case '/':{if (y == 0){std::cerr << "div zero error!" << std::endl;result = -1;}else{result = x / y;}}break;case '%':{if (y == 0){std::cerr << "mod zero error!" << std::endl;result = -1;}else{result = x % y;}}break;default:break;}return result;
}
main.cc:
#include "RingQueue.hpp"
#include "Task.hpp"
#include <iostream>
#include <cstdlib>
#include <ctime>
#include <pthread.h>
#include <sys/types.h>
#include <unistd.h>std::string SelfName()
{char name[64];snprintf(name, sizeof(name), "thread[0x%0x]", pthread_self());return name;
}void *ProducerRoutine(void *rq)
{// RingQueue<int> *ringqueue = static_cast<RingQueue<int> *>(rq);RingQueue<Task> *ringqueue = static_cast<RingQueue<Task> *>(rq);while (true){// version1// int data = rand() % 10 + 1;// ringqueue->push(data);// std::cout << "生产完成,生产的数据是:" << data << std::endl; // 和阻塞队列不同的是,如果生产和消费的不是同一个位置,生产和消费是可以同时并发执行的。// sleep(1);// version2// 构建or获取任务int x = rand() % 100 + 1;int y = rand() % 10;char op = oper[rand() % oper.size()];// 生产任务Task t(x, y, op, mymath);ringqueue->Push(t);// 输出提示std::cout << SelfName() << "生产者派发了一个任务:" << t.toTaskString() << std::endl;sleep(1);}
}
void *ConsumerRoutine(void *rq)
{// RingQueue<int> *ringqueue = static_cast<RingQueue<int> *>(rq);RingQueue<Task> *ringqueue = static_cast<RingQueue<Task> *>(rq);while (true){// version1// int data;// ringqueue->pop(&data);// std::cout << "消费完成,消费的数据是:" << data << std::endl;// version2Task t;ringqueue->Pop(&t);std::string result = t();std::cout << SelfName() << "消费者消费了一个任务:" << result << std::endl;//sleep(1);}
}int main()
{srand((unsigned int)time(nullptr) ^ getpid());// RingQueue<int> *rq = new RingQueue<int>();RingQueue<Task> *rq = new RingQueue<Task>();// 单生产者、单消费者// pthread_t p, c;// pthread_create(&p, nullptr, ProducerRoutine, rq);// pthread_create(&p, nullptr, ConsumerRoutine, rq);// pthread_join(p, nullptr);// pthread_join(c, nullptr);// 多生产者、多消费者pthread_t p[4], c[8];for (int i = 0; i < 4; i++){pthread_create(p + i, nullptr, ProducerRoutine, rq);}for (int i = 0; i < 8; i++){pthread_create(c + i, nullptr, ConsumerRoutine, rq);}for (int i = 0; i < 4; i++){pthread_join(p[i], nullptr);}for (int i = 0; i < 8; i++){pthread_join(c[i], nullptr);}return 0;
}
这样,也就实现出了基于RingQueue环形队列的多生产者多消费者模型了。
- 是先加锁,后申请信号量,还是先申请信号量,后加锁呢?
- 先加锁,再申请信号量。一个线程在加锁的时候,其他线程什么也干不了,只能静静地等待持有锁之后,再申请信号量。
- 先申请信号量,后加锁。一个线程申请信号量之后,再加锁进入临界区的同时,其他线程也可以申请信号量,在一定程度上提高了多线程申请信号量的并发性。
多生产者多消费者时,进入这个环形队列的,最少有1个线程,为空或为满的时候;最多有两个线程,一个生产者,一个消费者。
- 那么环形队列的多生产者多消费者的意义是什么呢?
和基于阻塞队列的多生产者、多消费者模型一样。
- 生产者在构建或获取任务时,未来可能要从文件、数据库、网络等获取数据来构建任务,是需要花费时间的,其他线程(生产者)可以同时并发的构建和获取任务!
- 消费者在收到任务以后,是要处理计算任务的,同样是需要花费时间的,其他线程(消费者)可以同时并发的处理计算任务!
也就是说,在生产之前,消费之后,多线程是可以并发执行的。
以上就是基于RingQueue环形队列的生产者消费者模型。