生产者-消费者模式
最经典的线程协作模式。
先看问题
缓冲区
生产者 ──────► [ ][ ][ ][ ][ ] ──────► 消费者
放数据 取数据- 生产者:制造数据,放到缓冲区
- 消费者:从缓冲区取数据,消费掉
- 缓冲区:平衡两者的速度差
核心问题:
- 缓冲区满了,生产者要等
- 缓冲区空了,消费者要等
- 多个线程同时操作缓冲区,必须线程安全
实现方式总览
| 方式 | 代码量 | 功能 | 推荐度 |
|---|---|---|---|
| synchronized + wait/notify | 多 | 基础 | ⭐⭐⭐ |
| Lock + Condition | 多 | 灵活 | ⭐⭐⭐⭐ |
| BlockingQueue | 少 | 开箱即用 | ⭐⭐⭐⭐⭐ |
| Semaphore | 多 | 底层控制 | ⭐⭐ |
方式一:synchronized + wait/notify
java
public class ProducerConsumerSync {
private static final int CAPACITY = 5;
private final Queue<Integer> buffer = new LinkedList<>();
public static void main(String[] args) {
ProducerConsumerSync demo = new ProducerConsumerSync();
// 2 个生产者
for (int i = 1; i <= 2; i++) {
final int id = i;
new Thread(() -> demo.produce(id), "P-" + i).start();
}
// 3 个消费者
for (int i = 1; i <= 3; i++) {
final int id = i;
new Thread(() -> demo.consume(id), "C-" + i).start();
}
}
public void produce(int id) {
for (int i = 1; i <= 5; i++) {
synchronized (buffer) {
while (buffer.size() >= CAPACITY) {
try {
buffer.wait(); // 队列满,等
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
buffer.offer(i);
System.out.println("生产者 " + id + " 生产: " + i);
buffer.notifyAll(); // 通知消费者
}
try {
Thread.sleep((long) (Math.random() * 200 + 100));
} catch (InterruptedException e) {}
}
}
public void consume(int id) {
for (int i = 0; i < 3; i++) {
synchronized (buffer) {
while (buffer.isEmpty()) {
try {
buffer.wait(); // 队列空,等
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
int value = buffer.poll();
System.out.println("消费者 " + id + " 消费: " + value);
buffer.notifyAll(); // 通知生产者
}
try {
Thread.sleep((long) (Math.random() * 300 + 200));
} catch (InterruptedException e) {}
}
}
}方式二:Lock + Condition
java
public class ProducerConsumerLock {
private static final int CAPACITY = 5;
private final Queue<Integer> buffer = new LinkedList<>();
private final Lock lock = new ReentrantLock();
private final Condition notFull = lock.newCondition();
private final Condition notEmpty = lock.newCondition();
public static void main(String[] args) {
ProducerConsumerLock demo = new ProducerConsumerLock();
for (int i = 1; i <= 2; i++) {
final int id = i;
new Thread(() -> demo.produce(id), "P-" + i).start();
}
for (int i = 1; i <= 3; i++) {
final int id = i;
new Thread(() -> demo.consume(id), "C-" + i).start();
}
}
public void produce(int id) {
for (int i = 1; i <= 5; i++) {
lock.lock();
try {
while (buffer.size() >= CAPACITY) {
notFull.await(); // 等「不满」
}
buffer.offer(i);
System.out.println("生产者 " + id + " 生产: " + i);
notEmpty.signal(); // 通知「非空」
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
lock.unlock();
}
}
}
public void consume(int id) {
for (int i = 0; i < 3; i++) {
lock.lock();
try {
while (buffer.isEmpty()) {
notEmpty.await(); // 等「非空」
}
int value = buffer.poll();
System.out.println("消费者 " + id + " 消费: " + value);
notFull.signal(); // 通知「不满」
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
lock.unlock();
}
}
}
}方式三:BlockingQueue(推荐)
最简洁,生产者和消费者完全解耦:
java
public class ProducerConsumerBQ {
private final BlockingQueue<Integer> queue;
public ProducerConsumerBQ() {
this.queue = new LinkedBlockingQueue<>(5); // 有界队列
}
public static void main(String[] args) {
ProducerConsumerBQ demo = new ProducerConsumerBQ();
for (int i = 1; i <= 2; i++) {
final int id = i;
new Thread(() -> demo.produce(id), "P-" + i).start();
}
for (int i = 1; i <= 3; i++) {
final int id = i;
new Thread(() -> demo.consume(id), "C-" + i).start();
}
}
public void produce(int id) {
for (int i = 1; i <= 5; i++) {
try {
queue.put(i); // 阻塞直到队列不满
System.out.println("生产者 " + id + " 生产: " + i);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public void consume(int id) {
for (int i = 0; i < 3; i++) {
try {
int value = queue.take(); // 阻塞直到队列不空
System.out.println("消费者 " + id + " 消费: " + value);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}BlockingQueue 方法对照:
| 操作 | 阻塞方法 | 非阻塞方法 |
|---|---|---|
| 入队 | put(e) | offer(e) |
| 出队 | take() | poll() |
| 入队(超时) | - | offer(e, t, u) |
| 出队(超时) | - | poll(t, u) |
方式四:Semaphore
java
public class ProducerConsumerSemaphore {
private static final int CAPACITY = 5;
private final Semaphore mutex = new Semaphore(1); // 互斥
private final Semaphore empty = new Semaphore(CAPACITY); // 空槽
private final Semaphore full = new Semaphore(0); // 满槽
private final Queue<Integer> buffer = new LinkedList<>();
public static void main(String[] args) {
ProducerConsumerSemaphore demo = new ProducerConsumerSemaphore();
for (int i = 1; i <= 2; i++) {
final int id = i;
new Thread(() -> demo.produce(id), "P-" + i).start();
}
for (int i = 1; i <= 3; i++) {
final int id = i;
new Thread(() -> demo.consume(id), "C-" + i).start();
}
}
public void produce(int id) {
for (int i = 1; i <= 5; i++) {
try {
empty.acquire(); // 占用一个空槽
mutex.acquire(); // 进入临界区
buffer.offer(i);
System.out.println("生产者 " + id + " 生产: " + i);
mutex.release();
full.release(); // 增加一个满槽
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public void consume(int id) {
for (int i = 0; i < 3; i++) {
try {
full.acquire(); // 占用一个满槽
mutex.acquire(); // 进入临界区
int value = buffer.poll();
System.out.println("消费者 " + id + " 消费: " + value);
mutex.release();
empty.release(); // 增加一个空槽
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}选择建议
| 场景 | 推荐 |
|---|---|
| 快速实现 | BlockingQueue |
| 需要灵活控制 | Lock + Condition |
| 学习原理 | synchronized + wait/notify |
| 特殊同步需求 | Semaphore |
常见错误
错误一:用 if 而不是 while
java
// ❌ 错误
if (buffer.isEmpty()) {
buffer.wait(); // 假醒时会继续执行
}
// ✅ 正确
while (buffer.isEmpty()) {
buffer.wait(); // 醒来后重新检查
}错误二:用 notify 而不是 notifyAll
java
// ❌ 危险:只唤醒一个,可能唤醒同类
buffer.notify();
// ✅ 安全:唤醒所有
buffer.notifyAll();错误三:不在 synchronized 里调用 wait
java
// ❌ 抛异常
buffer.wait();
// ✅ 正确
synchronized (buffer) {
buffer.wait();
}总结
- 生产者-消费者模式:解耦生产者和消费者
- 核心问题:队列满/空时的等待
- 首选 BlockingQueue——帮你处理了所有同步细节
- 如果需要更精细的控制,用 Lock + Condition
- 记住用 while 检查条件,用 signalAll 避免饥饿
