文章目录
- 1. 并发控制高级工具简介
- 1.1 CountDownLatch
- 1.2 CyclicBarrier
- 1.3 Semaphore
- 1.4 并发设计模式
- 2. 扩展生产者—消费者示例
- 2.1 示例代码
- 3. 代码详解
- 3.1 主类 ExtendedProducerConsumerDemo
- 3.2 Buffer 类
- 3.3 Producer 类
- 3.4 Consumer 类
- 4. 编译与运行结果
- 4.1 编译
- 4.2 运行
- 5. 总结与思考
1. 并发控制高级工具简介
1.1 CountDownLatch
- 作用
CountDownLatch 用于让一个或多个线程等待其他线程完成某些操作。 - 使用方式
创建时指定计数值,每当某个线程调用 countDown() 后,计数器减一;调用 await() 的线程会阻塞,直到计数器归零。 - 示例场景
等待所有工作线程启动完毕后,再同时开始执行后续流程。
1.2 CyclicBarrier
- 作用
CyclicBarrier 用于让一组线程在某一点汇聚,所有线程达到屏障后再一起继续执行。 - 使用方式
在构造时设定参与线程数量,可选择设置一个“屏障动作”(一个 Runnable),在所有线程到达后自动执行。 - 示例场景
多个生产者每生产完一次“周期”产品后,等待其它生产者,以便统一开始下一周期生产。
1.3 Semaphore
- 作用
Semaphore 用于限制对共享资源的并发访问线程数。 - 使用方式
通过 acquire() 获取许可,如果没有许可则阻塞;通过 release() 释放许可。 - 示例场景
控制同时进行生产操作的生产者数量,避免对有限资源的竞争。
1.4 并发设计模式
- 生产者—消费者模式
经典模式中生产者不断产生数据,消费者不断消费数据,通常需要借助队列和线程同步机制协同工作。 - 读写锁
通过 ReentrantReadWriteLock 实现读与写操作分离,提高并发度(适用于读远多于写的场景)。
2. 扩展生产者—消费者示例
以下示例扩展了传统生产者—消费者模型,加入了三种并发工具以应对复杂场景:
CountDownLatch
:所有生产者先“就绪”,然后统一启动生产。Semaphore
:限制同时进行生产操作的生产者数。CyclicBarrier
:每个生产周期结束后,等待所有生产者完成,进行周期性的同步(屏障触发时打印提示信息)。
2.1 示例代码
将以下代码保存为一个 Java 文件(例如 ExtendedProducerConsumerDemo.java),或拆分为多个类文件后按包编译运行。
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;// 主类入口
public class ExtendedProducerConsumerDemo {public static void main(String[] args) {// 创建共享缓冲区(容量设为10)Buffer buffer = new Buffer(10);int numProducers = 3;int numConsumers = 2;// CountDownLatch 用于等待所有生产者线程准备就绪CountDownLatch startLatch = new CountDownLatch(numProducers);// CyclicBarrier 用于同步所有生产者的周期生产,所有生产者到达屏障后执行屏障动作CyclicBarrier barrier = new CyclicBarrier(numProducers, () -> {System.out.println("【屏障触发】所有生产者完成本周期生产,开启新周期!");});// Semaphore 控制同时进入生产过程的生产者数(例如最多2个)Semaphore semaphore = new Semaphore(2);// 启动生产者线程for (int i = 1; i <= numProducers; i++) {Producer producer = new Producer("Producer-" + i, buffer, startLatch, barrier, semaphore);new Thread(producer).start();}// 启动消费者线程for (int i = 1; i <= numConsumers; i++) {Consumer consumer = new Consumer("Consumer-" + i, buffer);new Thread(consumer).start();}}
}// 共享缓冲区类:使用 synchronized 及 wait/notify 保护共享队列
class Buffer {private Queue<Integer> queue = new LinkedList<>();private int capacity;public Buffer(int capacity) {this.capacity = capacity;}// 生产数据操作public synchronized void produce(int value) throws InterruptedException {while(queue.size() == capacity) {System.out.println(Thread.currentThread().getName() + " 等待,缓冲区已满!");wait();}queue.add(value);System.out.println(Thread.currentThread().getName() + " produced: " + value);notifyAll();}// 消费数据操作public synchronized int consume() throws InterruptedException {while(queue.isEmpty()) {System.out.println(Thread.currentThread().getName() + " 等待,缓冲区为空!");wait();}int value = queue.poll();System.out.println(Thread.currentThread().getName() + " consumed: " + value);notifyAll();return value;}
}// 生产者类:使用 CountDownLatch、Semaphore 与 CyclicBarrier 进行生产同步
class Producer implements Runnable {private String name;private Buffer buffer;private CountDownLatch startLatch;private CyclicBarrier barrier;private Semaphore semaphore;private static int globalItemCounter = 0; // 用于生成生产的唯一数据public Producer(String name, Buffer buffer, CountDownLatch startLatch, CyclicBarrier barrier, Semaphore semaphore) {this.name = name;this.buffer = buffer;this.startLatch = startLatch;this.barrier = barrier;this.semaphore = semaphore;}@Overridepublic void run() {Thread.currentThread().setName(name);// 表示当前生产者已就绪,并倒计时System.out.println(name + " is ready.");startLatch.countDown();try {// 等待所有生产者就绪,然后统一启动startLatch.await();} catch (InterruptedException e) {e.printStackTrace();}while(true) {try {// 限制同时进行生产操作的数量semaphore.acquire();int item = produceItem();buffer.produce(item);semaphore.release();// 生产者完成本周期生产后,在屏障处等待其他生产者barrier.await();// 模拟生产周期间隔Thread.sleep(500);} catch (Exception e) {e.printStackTrace();}}}// 模拟生成产品(用同步保证全局计数器线程安全)private synchronized int produceItem() {return ++globalItemCounter;}
}// 消费者类:不断从缓冲区中消费产品
class Consumer implements Runnable {private String name;private Buffer buffer;public Consumer(String name, Buffer buffer) {this.name = name;this.buffer = buffer;}@Overridepublic void run() {Thread.currentThread().setName(name);while(true) {try {int item = buffer.consume();// 模拟消费耗时Thread.sleep(1000);} catch(Exception e) {e.printStackTrace();}}}
}
3. 代码详解
3.1 主类 ExtendedProducerConsumerDemo
- Buffer 创建
Buffer buffer = new Buffer(10); 创建一个容量为 10 的共享缓冲区,用于存放生产的数据。 - CountDownLatch
通过 new CountDownLatch(numProducers) 创建 latch,用于确保所有 3 个生产者线程都已准备就绪后再同时开始生产。 - CyclicBarrier
new CyclicBarrier(numProducers, barrierAction) 创建一个屏障,当 3 个生产者均调用 await() 后,自动执行屏障动作(打印提示信息),然后各生产者继续下一周期。 - Semaphore
设定同时允许最多 2 个生产者进入生产操作,模拟对有限资源的访问控制。 - 启动线程
主方法中循环创建生产者和消费者线程,并分别启动。
3.2 Buffer 类
- 与之前生产者—消费者示例类似,使用了 synchronized 的 produce/consume 方法,利用 wait/notifyAll 控制对共享队列的访问,防止数据竞争。
3.3 Producer 类
- 就绪等待
每个生产者启动后立即调用startLatch.countDown()
表示自己已就绪,然后通过startLatch.await() 等
待所有生产者到位。 - Semaphore 限流
在每次生产时,先调用semaphore.acquire()
获取许可,保证同一时刻最多只有 2 个生产者在进行产品生成与缓冲区写入操作;生产完成后释放许可。 - 生成产品
利用 synchronized 方法produceItem()
确保全局计数器globalItemCounter
正确递增,生成唯一数据。 - CyclicBarrier 同步
每个生产者调用barrier.await()
后,将等待其他生产者达到屏障,屏障触发后屏障动作会打印提示,完成一次“生产周期”同步。
3.4 Consumer 类
- 不断循环调用
buffer.consume()
消费产品,并模拟消费延时,每次消费后线程休眠 1 秒,使生产与消费速率形成差异,便于观察各同步工具的效果。
4. 编译与运行结果
4.1 编译
-
在 IntelliJ IDEA 中创建一个 Java 项目,将上述所有代码保存到一个文件中(或分成多个文件后按包编译)。
-
使用 IDE 的编译功能或在命令行下使用
javac ExtendedProducerConsumerDemo.java
编译。
4.2 运行
运行主类 ExtendedProducerConsumerDemo
后,你将在控制台看到类似如下的输出(具体顺序因线程调度和系统环境而异):
Producer-1 is ready.
Producer-2 is ready.
Producer-3 is ready.
Producer-1 produced: 1
Producer-2 produced: 2
Producer-3 produced: 3
【屏障触发】所有生产者完成本周期生产,开启新周期!
Consumer-1 consumed: 1
Consumer-2 consumed: 2
Producer-1 produced: 4
Producer-2 produced: 5
Producer-3 produced: 6
【屏障触发】所有生产者完成本周期生产,开启新周期!
Consumer-1 consumed: 3
Consumer-2 consumed: 4
...
说明:
- 生产者就绪: 启动时首先打印各生产者“is ready”,并通过 CountDownLatch 同步后,再开始生产。
- Semaphore 限制 :同一时刻只有部分生产者进入生产区间,顺序可能受限。
- CyclicBarrier 同步 :每当所有生产者完成当前“生产周期”(即各自都到达 barrier.await()),会触发屏障动作,打印提示信息,然后各自继续下一次生产。
- 消费者输出 :消费者不断消费数据,输出消费记录。由于生产和消费速率不同,缓冲区可能时而空、时而满,从而触发相应的等待提示信息。
5. 总结与思考
- 高级并发工具的应用
通过 CountDownLatch、CyclicBarrier、Semaphore,我们可以对线程的启动顺序、周期性同步以及并发访问控制进行更精细的管理,为复杂场景下的并发编程提供有力支持。 - 设计模式
扩展生产者—消费者模型展示了如何在经典模式中引入更多同步工具,使系统在并发环境中更稳健,同时也为其他设计模式(如读写锁模式)提供借鉴。