在 C# 中实现 生产者-消费者模式,通常需要多个线程来处理数据的生产和消费。我们可以使用 Queue<T>
来作为存储数据的队列,并使用 Thread
、Mutex
或 Monitor
来确保线程安全。BlockingCollection<T>
是 C# 提供的一个线程安全的集合,可以非常方便地用于实现生产者-消费者模式。
生产者-消费者模式的关键点:
- 生产者线程:产生数据并将其放入队列中。
- 消费者线程:从队列中取出数据并进行处理。
- 线程同步:使用
BlockingCollection<T>
等线程安全的集合来避免竞争条件,同时确保生产者和消费者之间的协调。
示例:使用 BlockingCollection<T>
C# 提供了 BlockingCollection<T>
类,它可以用来在生产者和消费者线程之间提供同步机制。它是一个线程安全的集合,并支持阻塞操作,因此可以自动协调生产者和消费者的行为。
代码示例:
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;class Program
{// 使用 BlockingCollection 实现线程安全的队列static BlockingCollection<int> queue = new BlockingCollection<int>(5); // 队列最大容量为5// 生产者线程static void Producer(){int item = 0;while (true){Thread.Sleep(500); // 模拟生产延迟// 生产数据并加入队列queue.Add(item);Console.WriteLine("生产者生产数据: " + item);item++;}}// 消费者线程static void Consumer(){while (true){int item = queue.Take(); // 阻塞直到队列中有数据Console.WriteLine("消费者消费数据: " + item);Thread.Sleep(1000); // 模拟消费延迟}}static void Main(){// 启动生产者线程Thread producerThread = new Thread(Producer);producerThread.Start();// 启动消费者线程Thread consumerThread = new Thread(Consumer);consumerThread.Start();// 等待线程结束(实际上,生产者和消费者线程会永远运行下去)producerThread.Join();consumerThread.Join();}
}
代码解释:
BlockingCollection<int> queue
:一个线程安全的队列,最大容量为 5。BlockingCollection
会在队列满时阻塞生产者线程,在队列为空时阻塞消费者线程。Producer()
:模拟生产者线程,每 500 毫秒生成一个数据并放入队列中。如果队列已满,Add
操作会阻塞生产者线程,直到队列有空位。Consumer()
:模拟消费者线程,每秒消费一个数据。Take()
会阻塞直到队列中有数据。Thread.Sleep()
:用来模拟生产和消费的延迟。
BlockingCollection<T>
的关键方法:
Add(T item)
:将项目添加到集合中。如果集合已满,它将阻塞直到有空余空间。Take()
:从集合中移除并返回一个项。如果集合为空,它将阻塞直到有可用项。TryAdd(T item)
:尝试将项目添加到集合中。如果成功则返回true
,否则返回false
,不会阻塞。TryTake(out T item)
:尝试从集合中移除并返回一个项。如果集合为空,返回false
。
扩展:多个生产者和多个消费者
BlockingCollection<T>
支持多个生产者和多个消费者,并且可以通过它来轻松实现复杂的生产者-消费者模型。你只需要启动多个线程来执行生产者和消费者的逻辑即可。
示例:多个生产者和多个消费者
using System;
using System.Collections.Concurrent;
using System.Threading;class Program
{static BlockingCollection<int> queue = new BlockingCollection<int>(5); // 队列最大容量为5// 生产者线程static void Producer(int id){int item = 0;while (true){Thread.Sleep(500); // 模拟生产延迟// 生产数据并加入队列queue.Add(item);Console.WriteLine($"生产者 {id} 生产数据: {item}");item++;}}// 消费者线程static void Consumer(int id){while (true){int item = queue.Take(); // 阻塞直到队列中有数据Console.WriteLine($"消费者 {id} 消费数据: {item}");Thread.Sleep(1000); // 模拟消费延迟}}static void Main(){// 启动多个生产者线程for (int i = 1; i <= 2; i++){int producerId = i;new Thread(() => Producer(producerId)).Start();}// 启动多个消费者线程for (int i = 1; i <= 3; i++){int consumerId = i;new Thread(() => Consumer(consumerId)).Start();}// 主线程等待Console.ReadLine();}
}
代码解释:
- 多个生产者线程:在
Main()
方法中,启动了 2 个生产者线程。每个线程调用Producer()
方法,生成不同的数据并将其放入共享队列。 - 多个消费者线程:启动了 3 个消费者线程,它们从同一个共享队列中取出数据进行处理。
运行结果:
生产者 1 生产数据: 0
生产者 2 生产数据: 0
消费者 1 消费数据: 0
生产者 1 生产数据: 1
消费者 2 消费数据: 1
消费者 3 消费数据: 2
...
总结:
BlockingCollection<T>
是 C# 中实现生产者-消费者模式的理想工具。它是线程安全的,支持阻塞操作,且可以容纳多个生产者和消费者。- 通过
BlockingCollection<T>
的Add
和Take
方法,生产者和消费者可以安全地进行数据交换而无需担心并发问题。 - 使用多个生产者和消费者线程时,
BlockingCollection<T>
会自动处理队列的同步和线程间协调。