文章目录
- 引言
- 理解生产者消费者模型
- 基于BlockingQueue的生产者消费者模型
- 单生产,单消费模型
- 多生产、多消费模型
引言
生产者消费者模型一般可以在超市中听到,例如如下是一个专门卖方便面的超市,这个超市有自己供应商,也有客户来买,客户称之为消费者。超市起到一个缓存作用,供应商放假的时候,短时间内超市依然有对应的商品,消费者依然可以消费;相同的,如果短时间内消费者不来买东西,供应商依然可以供应给超市。也就是说,供应商生产产品比较慢,可以先生成一批产品放在超市中;供应商如果供应比较快,可以等消费者消费一段时间再去供应产品,协调忙线不均。现实生活中,在人口密集的地方肯定会有超市,生产者消费者模型效率高,有了超市这个巨大的缓存,可以使得消费者和生产者并发起来。
个别消费者不想买方便面不会影响到供应商,个别供应商出现了问题,不会影响消费者买方便面,这就做到了生产者和消费者的解耦。
理解生产者消费者模型
上述例子对应到计算机中,供应商和消费者就是线程,超市是一段内存空间,方便面是数据。生产线程将数据交到一段内存空间中,消费线程从内存空间中将数据拿走。
“321原则”:
- 一个交易场所(特定数据结构的形式存在的一段内存空间)
- 两种角色:生产者、消费者,也就是生产线程和消费线程
- 三种关系:生产和生产(互斥关系)、消费和消费(互斥关系)、生产和消费(互斥关系、同步关系)
实现生产者消费者模型本质就是通过代码实现“321原则”,用锁和条件变量(或者其他形式)来实现三种关系。
基于BlockingQueue的生产者消费者模型
在多线程编程中阻塞队列(Blocking Queue)是一种常用于实现生产者和消费者模型的数据结构。其与普通的队列区别在于,当队列为空时,从队列获取元素的操作将会被阻塞,直到队列中被放入了元素;当队列满时,往队列里存放元素的操作也会被阻塞,直到有元素被从队列中取出(以上的操作都是基于不同的线程来说的,线程在对阻塞队列进程操作时会被阻塞)
单生产,单消费模型
//BlockQueue.hpp
#pragma once#include<iostream>
#include<string>
#include<queue>
#include<pthread.h>const static int defaultcap=5;template<typename T>
class BlockQueue
{
private:bool isFull(){return _block_queue.size()==_max_cap;}bool isEmpty(){return _block_queue.empty();}public:BlockQueue(int cap= defaultcap):_max_cap(cap){pthread_mutex_init(&_mutex,nullptr);pthread_cond_init(&_P_cond,nullptr);pthread_cond_init(&_C_cond,nullptr);}void Pop(T *out){pthread_mutex_lock(&_mutex);while(isEmpty()){pthread_cond_wait(&_C_cond,&_mutex);}*out=_block_queue.front();_block_queue.pop();pthread_mutex_unlock(&_mutex);pthread_cond_signal(&_P_cond); //唤醒生产者}void Equeue(const T &in){pthread_mutex_lock(&_mutex);while(isFull()) //阻塞队列满{//满了生产者不能再生产,必须等待pthread_cond_wait(&_P_cond,&_mutex); //被调用的时候,除了让自己继续排队等待,还会释放自己传递的锁//函数返回时,会返回在临界区,必须先参与锁的竞争,重新加上锁,该函数才会返回,依然是持有锁的状态}//阻塞队列未满或者被唤醒_block_queue.push(in); //生产数据到阻塞队列pthread_mutex_unlock(&_mutex);pthread_cond_signal(&_C_cond); //唤醒消费者}~BlockQueue(){pthread_mutex_destroy(&_mutex);pthread_cond_destroy(&_P_cond);pthread_cond_destroy(&_C_cond);}private:std::queue<T> _block_queue; //临界资源int _max_cap;pthread_mutex_t _mutex;pthread_cond_t _P_cond; //生产者条件变量pthread_cond_t _C_cond; //消费者条件变量
};
Pop
函数:从队列中取出元素,并将其存储在 out
指针指向的地址中。步骤如下:
- 锁定互斥量:通过
pthread_mutex_lock(&_mutex)
确保对队列的操作是线程安全的。 - 等待条件变量:如果队列为空,使用
pthread_cond_wait(&_C_cond, &_mutex)
等待消费者条件变量被信号唤醒。 - 取出元素:从队列中取出前面的元素,并将其弹出。
- 解锁互斥量:通过
pthread_mutex_unlock(&_mutex)
解锁。 - 唤醒生产者:使用
pthread_cond_signal(&_P_cond)
唤醒可能被阻塞的生产者线程。
Equeue
函数:将元素 in
插入队列。步骤如下:
- 锁定互斥量:通过
pthread_mutex_lock(&_mutex)
确保对队列的操作是线程安全的。 - 等待条件变量:如果队列已满,使用
pthread_cond_wait(&_P_cond, &_mutex)
等待生产者条件变量被信号唤醒。 - 插入元素:将新元素插入到队列中。
- 解锁互斥量:通过
pthread_mutex_unlock(&_mutex)
解锁。 - 唤醒消费者:使用
pthread_cond_signal(&_C_cond)
唤醒可能被阻塞的消费者线程。
为了体现阻塞队列的特点,分别设计了两种测试代码:
- 生产一个,消费一个
#include"BlockQueue.hpp"
#include<ctime>
#include<pthread.h>
#include<unistd.h>void *Consumer(void *args)
{BlockQueue<int> *bq=static_cast<BlockQueue<int> *>(args);while(true){//获取数据int data=0;bq->Pop(&data);//处理数据std::cout<<"Coumer -> "<<data<<std::endl;}
}void *Productor(void *args)
{srand(time(nullptr)^getpid());BlockQueue<int> *bq=static_cast<BlockQueue<int> *>(args);while(true){sleep(2);//构建数据int data=rand()%10+1; // [1,10]//生产数据bq->Equeue(data);std::cout<<"Productor -> "<<data<<std::endl;}
}int main()
{BlockQueue<int> *bq=new BlockQueue<int>();pthread_t c,p;pthread_create(&c,nullptr,Consumer,bq);pthread_create(&p,nullptr,Productor,bq);pthread_join(c,nullptr);pthread_join(p,nullptr);return 0;
}
- 先生产一批数据,直到队列开始阻塞,然后消费一个,生产一个
#include"BlockQueue.hpp"
#include<ctime>
#include<pthread.h>
#include<unistd.h>void *Consumer(void *args)
{BlockQueue<int> *bq=static_cast<BlockQueue<int> *>(args);while(true){sleep(2);//获取数据int data=0;bq->Pop(&data);//处理数据std::cout<<"Coumer -> "<<data<<std::endl;}
}void *Productor(void *args)
{srand(time(nullptr)^getpid());BlockQueue<int> *bq=static_cast<BlockQueue<int> *>(args);while(true){//构建数据int data=rand()%10+1; // [1,10]//生产数据bq->Equeue(data);std::cout<<"Productor -> "<<data<<std::endl;}
}int main()
{BlockQueue<int> *bq=new BlockQueue<int>();pthread_t c,p;pthread_create(&c,nullptr,Consumer,bq);pthread_create(&p,nullptr,Productor,bq);pthread_join(c,nullptr);pthread_join(p,nullptr);return 0;
}
上述测试代码是传递一个int
类型的数据到阻塞队列中,也可以传递其他类型,在传递struct
或者class
类型时,可以封装成一个个的任务传递到阻塞队列中。
- 传递任务:
//Task.hpp
#pragma once
#include<iostream>
#include<string>class Task
{public:Task(){}Task(int x,int y):_x(x),_y(y){}void Excute(){_result=_x+_y;}std::string debug(){std::string msg=std::to_string(_x)+"+"+std::to_string(_y)+"=?";return msg;}std::string result(){std::string msg=std::to_string(_x)+"+"+std::to_string(_y)+"="+std::to_string(_result);return msg;}private:int _x;int _y;int _result;
};
#include"BlockQueue.hpp"
#include"Task.hpp"
#include<ctime>
#include<pthread.h>
#include<unistd.h>void *Consumer(void *args)
{BlockQueue<Task> *bq=static_cast<BlockQueue<Task> *>(args);while(true){sleep(2);//获取数据Task t;bq->Pop(&t);// bq->Pop(&data);//处理数据t.Excute();std::cout<<"Coumer -> "<<t.result()<<std::endl;}
}void *Productor(void *args)
{srand(time(nullptr)^getpid());BlockQueue<Task> *bq=static_cast<BlockQueue<Task> *>(args);while(true){//构建数据int x=rand()%10+1;usleep(x*1000);int y=rand()%10+1;Task t(x,y);//生产数据bq->Equeue(t);std::cout<<"Productor -> "<<t.debug()<<std::endl;}
}int main()
{BlockQueue<Task> *bq=new BlockQueue<Task>();pthread_t c,p;pthread_create(&c,nullptr,Consumer,bq);pthread_create(&p,nullptr,Productor,bq);pthread_join(c,nullptr);pthread_join(p,nullptr);return 0;
}
- 传递函数任务:
//Task.hpp
#pragma once
#include<iostream>
#include<string>
#include<functional>using task_t=std::function<void()>;void Download()
{std::cout<<"I am Download task"<<std::endl;
}
//main.cc
#include"BlockQueue.hpp"
#include"Task.hpp"
#include<ctime>
#include<pthread.h>
#include<unistd.h>void *Consumer(void *args)
{BlockQueue<task_t> *bq=static_cast<BlockQueue<task_t> *>(args);while(true){sleep(2);//获取数据task_t t;bq->Pop(&t);//处理数据t();}
}void *Productor(void *args)
{srand(time(nullptr)^getpid());BlockQueue<task_t> *bq=static_cast<BlockQueue<task_t> *>(args);while(true){bq->Equeue(Download);std::cout<<"Productor -> Download "<<std::endl;}
}int main()
{BlockQueue<task_t> *bq=new BlockQueue<task_t>();pthread_t c,p;pthread_create(&c,nullptr,Consumer,bq);pthread_create(&p,nullptr,Productor,bq);pthread_join(c,nullptr);pthread_join(p,nullptr);return 0;
}
多生产、多消费模型
创建两个消费者线程 c1
和 c2
,它们会并行地从队列中取出任务并处理。创建三个生产者线程 p1
、p2
和 p3
,它们会并行地将任务放入队列中。
//main.cc#include"BlockQueue.hpp"
#include"Task.hpp"
#include<ctime>
#include<pthread.h>
#include<unistd.h>void *Consumer(void *args)
{BlockQueue<task_t> *bq=static_cast<BlockQueue<task_t> *>(args);while(true){//获取数据task_t t;bq->Pop(&t);t();}
}void *Productor(void *args)
{srand(time(nullptr)^getpid());BlockQueue<task_t> *bq=static_cast<BlockQueue<task_t> *>(args);while(true){bq->Equeue(Download);std::cout<<"Productor -> Download "<<std::endl;sleep(1);}
}int main()
{BlockQueue<task_t> *bq=new BlockQueue<task_t>();pthread_t c1,c2,p1,p2,p3;pthread_create(&c1,nullptr,Consumer,bq);pthread_create(&c2,nullptr,Consumer,bq);pthread_create(&p1,nullptr,Productor,bq);pthread_create(&p1,nullptr,Productor,bq);pthread_create(&p3,nullptr,Productor,bq);pthread_join(c1,nullptr);pthread_join(c2,nullptr);pthread_join(p1,nullptr);pthread_join(p2,nullptr);pthread_join(p3,nullptr);return 0;
}