生产者-消费者问题
1、使用C语言和POSIX线程(pthread)库实现的生产者-消费者问题的示例,将使用互斥锁(mutex)和条件变量(condition variables)来实现同步。
#include <pthread.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h> #define BUFFER_SIZE 5 int buffer[BUFFER_SIZE];
int count = 0;
pthread_mutex_t mutex;
pthread_cond_t not_empty;
pthread_cond_t not_full; void* producer(void* arg) { int item; while (1) { // 生产物品 item = rand() % 100; // 假设生产的是0到99之间的随机数 pthread_mutex_lock(&mutex); // 等待缓冲区不满 while (count == BUFFER_SIZE) { pthread_cond_wait(¬_full, &mutex); } // 将物品放入缓冲区 buffer[count] = item; count++; // 通知消费者 pthread_cond_signal(¬_empty); pthread_mutex_unlock(&mutex); sleep(1); // 模拟生产耗时 } return NULL;
} void* consumer(void* arg) { while (1) { pthread_mutex_lock(&mutex); // 等待缓冲区不为空 while (count == 0) { pthread_cond_wait(¬_empty, &mutex); } // 从缓冲区取出物品 int item = buffer[count - 1]; count--; // 通知生产者 pthread_cond_signal(¬_full); pthread_mutex_unlock(&mutex); printf("Consumed: %d\n", item); sleep(1); // 模拟消费耗时 } return NULL;
} int main() { pthread_t tid_producer, tid_consumer; pthread_mutex_init(&mutex, NULL); pthread_cond_init(¬_empty, NULL); pthread_cond_init(¬_full, NULL); pthread_create(&tid_producer, NULL, producer, NULL); pthread_create(&tid_consumer, NULL, consumer, NULL); pthread_join(tid_producer, NULL); pthread_join(tid_consumer, NULL); pthread_mutex_destroy(&mutex); pthread_cond_destroy(¬_empty); pthread_cond_destroy(¬_full); return 0;
}
注意:
- 在实际应用中,
pthread_join
在main
函数中通常不会这样使用,因为它会阻塞主线程直到指定的线程结束。由于生产者和消费者是无限循环的,这会导致程序无法退出。在实际应用中,可能需要一种方式来优雅地终止这些线程。 - 这个示例中使用了
sleep
函数来模拟生产和消费过程的时间消耗。 - 确保在编译时链接了pthread库,例如使用gcc的
-lpthread
选项。
2、在这个程序中,使用了信号量和互斥锁来实现生产者和消费者之间的同步。生产者不断生产物品并放入缓冲区,消费者不断从缓冲区中取出物品。信号量empty
和full
分别表示缓冲区的空位数和已占用位数,互斥锁mutex
用于保护对缓冲区的访问。
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
#include <stdlib.h>#define BUFFER_SIZE 5int buffer[BUFFER_SIZE];
int in = 0;
int out = 0;sem_t empty;
sem_t full;
pthread_mutex_t mutex;void *producer(void *arg) {int item;while (1) {item = rand() % 100;sem_wait(&empty);pthread_mutex_lock(&mutex);buffer[in] = item;printf("Producer produced: %d\n", item);in = (in + 1) % BUFFER_SIZE;pthread_mutex_unlock(&mutex);sem_post(&full);}return NULL;
}void *consumer(void *arg) {int item;while (1) {sem_wait(&full);pthread_mutex_lock(&mutex);item = buffer[out];printf("Consumer consumed: %d\n", item);out = (out + 1) % BUFFER_SIZE;pthread_mutex_unlock(&mutex);sem_post(&empty);}return NULL;
}int main() {pthread_t producer_thread, consumer_thread;sem_init(&empty, 0, BUFFER_SIZE);sem_init(&full, 0, 0);pthread_mutex_init(&mutex, NULL);pthread_create(&producer_thread, NULL, producer, NULL);pthread_create(&consumer_thread, NULL, consumer, NULL);pthread_join(producer_thread, NULL);pthread_join(consumer_thread, NULL);sem_destroy(&empty);sem_destroy(&full);pthread_mutex_destroy(&mutex);return 0;
}
3、利用记录型信号量解决生产者-消费者问题
(1)记录型信号量在POSIX线程中通常通过sem_t
类型和相关的操作函数(如sem_init
、sem_wait
、sem_post
、sem_destroy
)来实现。
#include <pthread.h>
#include <semaphore.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h> #define BUFFER_SIZE 5 int buffer[BUFFER_SIZE];
int count = 0;
sem_t empty; // 控制缓冲区是否为空的信号量
sem_t full; // 控制缓冲区是否已满的信号量
pthread_mutex_t mutex; // 还需要一个互斥锁来保护对缓冲区的共享访问 void* producer(void* arg) { while (1) { // 生产物品 int item = rand() % 100; sem_wait(&empty); // 等待缓冲区有空位 pthread_mutex_lock(&mutex); // 将物品放入缓冲区 buffer[count] = item; count++; pthread_mutex_unlock(&mutex); sem_post(&full); // 通知消费者缓冲区有物品 sleep(1); // 模拟生产耗时 } return NULL;
} void* consumer(void* arg) { while (1) { sem_wait(&full); // 等待缓冲区有物品 pthread_mutex_lock(&mutex); // 从缓冲区取出物品 int item = buffer[count - 1]; count--; pthread_mutex_unlock(&mutex); sem_post(&empty); // 通知生产者缓冲区有空位 printf("Consumed: %d\n", item); sleep(1); // 模拟消费耗时 } return NULL;
} int main() { pthread_t tid_producer, tid_consumer; sem_init(&empty, 0, BUFFER_SIZE); // 初始时,缓冲区为空,empty信号量为BUFFER_SIZE sem_init(&full, 0, 0); // 初始时,缓冲区没有物品,full信号量为0 pthread_mutex_init(&mutex, NULL); pthread_create(&tid_producer, NULL, producer, NULL); pthread_create(&tid_consumer, NULL, consumer, NULL); // 注意:这里不会调用pthread_join,因为生产者和消费者是无限循环的。 // 在实际应用中,可能需要一种机制来优雅地终止这些线程。 // 为了示例的完整性,这里只是简单地让主线程休眠一段时间。 // 在实际应用中,应该避免这样做。 sleep(10); // 让主线程休眠一段时间以观察生产者和消费者的行为 // 注意:这里并没有清理资源(如销毁信号量和互斥锁),因为程序将无限期运行。 // 在实际应用中,应该在适当的时候销毁这些资源。 return 0;
}
注意:
- 在这个示例中,使用了
sem_t
类型的信号量empty
和full
来控制缓冲区是否为空和是否已满。 - 还需要一个互斥锁
mutex
来保护对共享缓冲区buffer
和计数器count
的访问,以防止数据竞争。 - 请注意,这个示例中的生产者和消费者是无限循环的,这意味着程序将无限期地运行下去。在实际应用中,可能需要一种机制来优雅地终止这些线程。
- 使用了
sleep
函数来模拟生产和消费过程的时间消耗。 - 由于这个示例是为了展示目的而简化的,因此在实际应用中,应该在适当的时候清理资源(如销毁信号量和互斥锁)。然而,在这个示例中,由于程序将无限期运行,所以没有包含这些清理操作。
(2)定义了一个记录型信号量的结构体,包含信号量的值和对应的信号量指针。通过自定义的wait
和signal
函数来操作记录型信号量,实现生产者和消费者之间的同步。生产者在生产物品前等待空缓冲区信号量,生产后通知满缓冲区信号量;消费者在消费物品前等待满缓冲区信号量,消费后通知空缓冲区信号量。互斥锁用于保护对缓冲区的访问。
#include <unistd.h>
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
#include <stdlib.h>
#include <time.h> // 引入time.h来使用srand和sleep #define BUFFER_SIZE 5
#define ITERATIONS 10 // 定义迭代次数以模拟生产者和消费者的工作 int buffer[BUFFER_SIZE];
int in = 0, out = 0; sem_t empty;
sem_t full;
pthread_mutex_t mutex; void *producer(void *arg) { int item, iteration = 0; srand(time(NULL)); // 初始化随机数种子 while (iteration < ITERATIONS) { item = rand() % 100; sem_wait(&empty); pthread_mutex_lock(&mutex); buffer[in] = item; printf("Producer produced: %d\n", item); in = (in + 1) % BUFFER_SIZE; pthread_mutex_unlock(&mutex); sem_post(&full); iteration++; sleep(1); // 模拟生产耗时 } return NULL;
} void *consumer(void *arg) { int item, iteration = 0; while (iteration < ITERATIONS) { sem_wait(&full); pthread_mutex_lock(&mutex); item = buffer[out]; printf("Consumer consumed: %d\n", item); out = (out + 1) % BUFFER_SIZE; pthread_mutex_unlock(&mutex); sem_post(&empty); iteration++; sleep(2); // 模拟消费耗时 } return NULL;
} int main() { pthread_t producer_thread, consumer_thread; sem_init(&empty, 0, BUFFER_SIZE); sem_init(&full, 0, 0); pthread_mutex_init(&mutex, NULL); pthread_create(&producer_thread, NULL, producer, NULL); pthread_create(&consumer_thread, NULL, consumer, NULL); pthread_join(producer_thread, NULL); pthread_join(consumer_thread, NULL); sem_destroy(&empty); sem_destroy(&full); pthread_mutex_destroy(&mutex); return 0;
}
4、 利用AND型信号量解决生产者-消费者问题
(1)在这个代码中,使用三个信号量mutex
、empty
和full
来实现生产者 - 消费者问题的同步。mutex
用于保护对缓冲区的互斥访问,empty
表示缓冲区中的空位置数量,full
表示缓冲区中的已占用位置数量。生产者在生产物品前先等待empty
信号量,然后获取mutex
互斥锁,将物品放入缓冲区,释放互斥锁并增加full
信号量。消费者在消费物品前先等待full
信号量,然后获取mutex
互斥锁,从缓冲区中取出物品,释放互斥锁并增加empty
信号量。
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <semaphore.h>#define BUFFER_SIZE 5int buffer[BUFFER_SIZE];
int in = 0;
int out = 0;sem_t mutex, empty, full;void *producer(void *arg) {int item;while (1) {item = rand() % 100;sem_wait(&empty);sem_wait(&mutex);buffer[in] = item;printf("Producer produced: %d\n", item);in = (in + 1) % BUFFER_SIZE;sem_post(&mutex);sem_post(&full);}return NULL;
}void *consumer(void *arg) {int item;while (1) {sem_wait(&full);sem_wait(&mutex);item = buffer[out];printf("Consumer consumed: %d\n", item);out = (out + 1) % BUFFER_SIZE;sem_post(&mutex);sem_post(&empty);}return NULL;
}int main() {pthread_t producer_thread, consumer_thread;sem_init(&mutex, 0, 1);sem_init(&empty, 0, BUFFER_SIZE);sem_init(&full, 0, 0);pthread_create(&producer_thread, NULL, producer, NULL);pthread_create(&consumer_thread, NULL, consumer, NULL);pthread_join(producer_thread, NULL);pthread_join(consumer_thread, NULL);sem_destroy(&mutex);sem_destroy(&empty);sem_destroy(&full);return 0;
}
(2)在C语言中,使用AND型信号量(也称为“复合信号量”或“多信号量同步”)直接实现可能不是最直观的方法,因为标准的POSIX线程库(pthread)并不直接支持AND型信号量。然而,可以通过组合使用普通的信号量(sem_t
)和互斥锁(pthread_mutex_t
)来模拟AND型信号量的行为。
在生产者-消费者问题中,AND型信号量通常用于确保在继续执行之前,多个条件都必须被满足。但在这个场景中,主要关心的是缓冲区的空和满状态,这通常可以通过两个独立的信号量(一个表示空槽位,另一个表示有数据的槽位)来实现。
不过,为了说明如何模拟AND型信号量的概念,可以考虑一个场景,其中生产者在继续生产之前需要等待两个条件:缓冲区未满,且另一个特定的条件(比如一个“允许生产”的信号量)也满足。
使用了两个信号量(empty
和 full
)和一个互斥锁(mutex
),但还将引入一个额外的信号量 permit
来模拟AND型信号量的一个方面。
#include <stdio.h>
#include <pthread.h>
#include <semaphore.h>
#include <stdlib.h>
#include <unistd.h> #define BUFFER_SIZE 5 int buffer[BUFFER_SIZE];
int in = 0, out = 0; sem_t empty;
sem_t full;
sem_t permit; // 额外的信号量,用于模拟AND型信号量的一个方面
pthread_mutex_t mutex; void *producer(void *arg) { int item; while (1) { // 等待两个条件:缓冲区有空位,且获得生产许可 sem_wait(&empty); sem_wait(&permit); pthread_mutex_lock(&mutex); item = rand() % 100; buffer[in] = item; printf("Producer produced: %d\n", item); in = (in + 1) % BUFFER_SIZE; pthread_mutex_unlock(&mutex); // 释放条件:缓冲区现在有一个新项 sem_post(&full); // 这里并不真正“释放”permit,因为它可能由外部逻辑控制 // 在实际使用中,可能需要在适当的时候从另一个线程释放它 } return NULL;
} void *consumer(void *arg) { int item; while (1) { sem_wait(&full); pthread_mutex_lock(&mutex); item = buffer[out]; printf("Consumer consumed: %d\n", item); out = (out + 1) % BUFFER_SIZE; pthread_mutex_unlock(&mutex); sem_post(&empty); // 消费者不直接涉及permit信号量 } return NULL;
} int main() { pthread_t producer_thread, consumer_thread; sem_init(&empty, 0, BUFFER_SIZE); sem_init(&full, 0, 0); sem_init(&permit, 0, 1); // 初始时允许生产 pthread_mutex_init(&mutex, NULL); pthread_create(&producer_thread, NULL, producer, NULL); pthread_create(&consumer_thread, NULL, consumer, NULL); // 注意:这里不应该在main中join这些线程,因为它们会无限循环 // 只是为了示例,让main线程休眠一段时间然后退出 sleep(10); // 在实际应用中,需要一种方法来优雅地停止这些线程 // 清理资源(但在实际程序中,这应该在所有线程都停止之后进行) // sem_destroy(&empty); // sem_destroy(&full); // sem_destroy(&permit); // pthread_mutex_destroy(&mutex); return 0;
}
注意:上面的代码示例中,permit
信号量被用作一个示例来模拟AND型信号量的一个方面,但在实际的生产者-消费者问题中,它可能不是必需的。此外,由于生产者和消费者线程在示例中是无限循环的,main
函数使用了 sleep(10);
来模拟程序运行一段时间。在真实的应用程序中,需要一种机制来优雅地停止这些线程,比如使用条件变量或其他同步机制。最后,请记得在实际应用程序中,应该在所有线程都正确停止之后,再销毁信号量和互斥锁。在上面的示例中,由于线程是无限循环的,因此注释掉了销毁信号量和互斥锁的代码。