Skip to content

生产者-消费者模式

最经典的线程协作模式。

先看问题

           缓冲区
生产者 ──────► [ ][ ][ ][ ][ ] ──────► 消费者
   放数据                      取数据
  • 生产者:制造数据,放到缓冲区
  • 消费者:从缓冲区取数据,消费掉
  • 缓冲区:平衡两者的速度差

核心问题:

  1. 缓冲区满了,生产者要等
  2. 缓冲区空了,消费者要等
  3. 多个线程同时操作缓冲区,必须线程安全

实现方式总览

方式代码量功能推荐度
synchronized + wait/notify基础⭐⭐⭐
Lock + Condition灵活⭐⭐⭐⭐
BlockingQueue开箱即用⭐⭐⭐⭐⭐
Semaphore底层控制⭐⭐

方式一:synchronized + wait/notify

java
public class ProducerConsumerSync {

    private static final int CAPACITY = 5;
    private final Queue<Integer> buffer = new LinkedList<>();

    public static void main(String[] args) {
        ProducerConsumerSync demo = new ProducerConsumerSync();
        // 2 个生产者
        for (int i = 1; i <= 2; i++) {
            final int id = i;
            new Thread(() -> demo.produce(id), "P-" + i).start();
        }
        // 3 个消费者
        for (int i = 1; i <= 3; i++) {
            final int id = i;
            new Thread(() -> demo.consume(id), "C-" + i).start();
        }
    }

    public void produce(int id) {
        for (int i = 1; i <= 5; i++) {
            synchronized (buffer) {
                while (buffer.size() >= CAPACITY) {
                    try {
                        buffer.wait();  // 队列满,等
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
                buffer.offer(i);
                System.out.println("生产者 " + id + " 生产: " + i);
                buffer.notifyAll();  // 通知消费者
            }
            try {
                Thread.sleep((long) (Math.random() * 200 + 100));
            } catch (InterruptedException e) {}
        }
    }

    public void consume(int id) {
        for (int i = 0; i < 3; i++) {
            synchronized (buffer) {
                while (buffer.isEmpty()) {
                    try {
                        buffer.wait();  // 队列空,等
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
                int value = buffer.poll();
                System.out.println("消费者 " + id + " 消费: " + value);
                buffer.notifyAll();  // 通知生产者
            }
            try {
                Thread.sleep((long) (Math.random() * 300 + 200));
            } catch (InterruptedException e) {}
        }
    }
}

方式二:Lock + Condition

java
public class ProducerConsumerLock {

    private static final int CAPACITY = 5;
    private final Queue<Integer> buffer = new LinkedList<>();

    private final Lock lock = new ReentrantLock();
    private final Condition notFull = lock.newCondition();
    private final Condition notEmpty = lock.newCondition();

    public static void main(String[] args) {
        ProducerConsumerLock demo = new ProducerConsumerLock();
        for (int i = 1; i <= 2; i++) {
            final int id = i;
            new Thread(() -> demo.produce(id), "P-" + i).start();
        }
        for (int i = 1; i <= 3; i++) {
            final int id = i;
            new Thread(() -> demo.consume(id), "C-" + i).start();
        }
    }

    public void produce(int id) {
        for (int i = 1; i <= 5; i++) {
            lock.lock();
            try {
                while (buffer.size() >= CAPACITY) {
                    notFull.await();  // 等「不满」
                }
                buffer.offer(i);
                System.out.println("生产者 " + id + " 生产: " + i);
                notEmpty.signal();  // 通知「非空」
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } finally {
                lock.unlock();
            }
        }
    }

    public void consume(int id) {
        for (int i = 0; i < 3; i++) {
            lock.lock();
            try {
                while (buffer.isEmpty()) {
                    notEmpty.await();  // 等「非空」
                }
                int value = buffer.poll();
                System.out.println("消费者 " + id + " 消费: " + value);
                notFull.signal();  // 通知「不满」
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            } finally {
                lock.unlock();
            }
        }
    }
}

方式三:BlockingQueue(推荐)

最简洁,生产者和消费者完全解耦:

java
public class ProducerConsumerBQ {

    private final BlockingQueue<Integer> queue;

    public ProducerConsumerBQ() {
        this.queue = new LinkedBlockingQueue<>(5);  // 有界队列
    }

    public static void main(String[] args) {
        ProducerConsumerBQ demo = new ProducerConsumerBQ();
        for (int i = 1; i <= 2; i++) {
            final int id = i;
            new Thread(() -> demo.produce(id), "P-" + i).start();
        }
        for (int i = 1; i <= 3; i++) {
            final int id = i;
            new Thread(() -> demo.consume(id), "C-" + i).start();
        }
    }

    public void produce(int id) {
        for (int i = 1; i <= 5; i++) {
            try {
                queue.put(i);  // 阻塞直到队列不满
                System.out.println("生产者 " + id + " 生产: " + i);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    public void consume(int id) {
        for (int i = 0; i < 3; i++) {
            try {
                int value = queue.take();  // 阻塞直到队列不空
                System.out.println("消费者 " + id + " 消费: " + value);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

BlockingQueue 方法对照

操作阻塞方法非阻塞方法
入队put(e)offer(e)
出队take()poll()
入队(超时)-offer(e, t, u)
出队(超时)-poll(t, u)

方式四:Semaphore

java
public class ProducerConsumerSemaphore {

    private static final int CAPACITY = 5;
    private final Semaphore mutex = new Semaphore(1);  // 互斥
    private final Semaphore empty = new Semaphore(CAPACITY);  // 空槽
    private final Semaphore full = new Semaphore(0);  // 满槽

    private final Queue<Integer> buffer = new LinkedList<>();

    public static void main(String[] args) {
        ProducerConsumerSemaphore demo = new ProducerConsumerSemaphore();
        for (int i = 1; i <= 2; i++) {
            final int id = i;
            new Thread(() -> demo.produce(id), "P-" + i).start();
        }
        for (int i = 1; i <= 3; i++) {
            final int id = i;
            new Thread(() -> demo.consume(id), "C-" + i).start();
        }
    }

    public void produce(int id) {
        for (int i = 1; i <= 5; i++) {
            try {
                empty.acquire();  // 占用一个空槽
                mutex.acquire();  // 进入临界区
                buffer.offer(i);
                System.out.println("生产者 " + id + " 生产: " + i);
                mutex.release();
                full.release();  // 增加一个满槽
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    public void consume(int id) {
        for (int i = 0; i < 3; i++) {
            try {
                full.acquire();  // 占用一个满槽
                mutex.acquire();  // 进入临界区
                int value = buffer.poll();
                System.out.println("消费者 " + id + " 消费: " + value);
                mutex.release();
                empty.release();  // 增加一个空槽
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

选择建议

场景推荐
快速实现BlockingQueue
需要灵活控制Lock + Condition
学习原理synchronized + wait/notify
特殊同步需求Semaphore

常见错误

错误一:用 if 而不是 while

java
// ❌ 错误
if (buffer.isEmpty()) {
    buffer.wait();  // 假醒时会继续执行
}

// ✅ 正确
while (buffer.isEmpty()) {
    buffer.wait();  // 醒来后重新检查
}

错误二:用 notify 而不是 notifyAll

java
// ❌ 危险:只唤醒一个,可能唤醒同类
buffer.notify();

// ✅ 安全:唤醒所有
buffer.notifyAll();

错误三:不在 synchronized 里调用 wait

java
// ❌ 抛异常
buffer.wait();

// ✅ 正确
synchronized (buffer) {
    buffer.wait();
}

总结

  • 生产者-消费者模式:解耦生产者和消费者
  • 核心问题:队列满/空时的等待
  • 首选 BlockingQueue——帮你处理了所有同步细节
  • 如果需要更精细的控制,用 Lock + Condition
  • 记住用 while 检查条件,用 signalAll 避免饥饿

基于 VitePress 构建