Skip to content

BlockingQueue:生产者-消费者模式的核心

BlockingQueue 是什么

BlockingQueue 是支持阻塞操作的队列:

java
BlockingQueue<Task> queue = new LinkedBlockingQueue<>(100);

// 生产者线程
queue.put(task); // 队列满时阻塞

// 消费者线程
Task task = queue.take(); // 队列空时阻塞
  • 队列满 → put() 阻塞
  • 队列空 → take() 阻塞

这就是实现生产者-消费者模式的核心。


四组 API

操作抛异常特殊值阻塞超时
入队add(e)offer(e)put(e)offer(e, time, unit)
出队remove()poll()take()poll(time, unit)
检查element()peek()

队列满时的行为

  • add(e) → 抛 IllegalStateException
  • offer(e) → 返回 false
  • put(e)阻塞等待
  • offer(e, 5, TimeUnit.SECONDS)阻塞 5 秒,超时返回 false

队列空时的行为

  • remove() → 抛 NoSuchElementException
  • poll() → 返回 null
  • take()阻塞等待
  • poll(5, TimeUnit.SECONDS)阻塞 5 秒,超时返回 null

四种实现

LinkedBlockingQueue

java
// 无界(默认 Integer.MAX_VALUE)
BlockingQueue<Task> unbounded = new LinkedBlockingQueue<>();

// 有界
BlockingQueue<Task> bounded = new LinkedBlockingQueue<>(100);

基于链表实现,性能好,是最常用的 BlockingQueue。

ArrayBlockingQueue

java
// 有界队列
BlockingQueue<Task> bounded = new ArrayBlockingQueue<>(100);

基于数组实现,必须指定容量。

PriorityBlockingQueue

java
BlockingQueue<Task> priority = new PriorityBlockingQueue<>();

按优先级排序,不是 FIFO。

SynchronousQueue

java
BlockingQueue<Task> sync = new SynchronousQueue<>();

容量为 0,每个 put 必须等待一个 take。


生产者-消费者示例

java
BlockingQueue<Task> queue = new LinkedBlockingQueue<>(100);

// 生产者线程
public class Producer implements Runnable {
    public void run() {
        while (hasMoreTasks()) {
            Task task = generateTask();
            try {
                queue.put(task); // 队列满时阻塞
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

// 消费者线程
public class Consumer implements Runnable {
    public void run() {
        while (true) {
            try {
                Task task = queue.take(); // 队列空时阻塞
                process(task);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                break;
            }
        }
    }
}

线程池的拒绝策略

java
// 线程池使用 BlockingQueue 存储任务
ExecutorService pool = new ThreadPoolExecutor(
    4, 8, 60L, TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(100)
);
// 队列满时,RejectedExecutionHandler 处理拒绝策略

总结

要点说明
put/take阻塞直到成功
offer/poll非阻塞或超时阻塞
LinkedBlockingQueue最常用,无界或有界
SynchronousQueue容量为 0,每个 put 等待一个 take

一句话:BlockingQueue 是生产者-消费者模式的「传输带」——队列满了就停下,队列空了就等待。


相关链接

基于 VitePress 构建