Skip to content

阻塞队列

生产者太快,消费者太慢怎么办?

这是并发编程中的经典问题。BlockingQueue 提供了阻塞的入队和出队操作,让生产者和消费者自动协调速度。

什么是阻塞队列

┌──────────────────────────────────────────────────┐
│              BlockingQueue 特性                    │
├──────────────────────────────────────────────────┤
│                                                  │
│  队列满时:                                       │
│  ├─ put(e)  → 阻塞,直到队列有空位                 │
│  └─ offer() → 返回 false(不阻塞)               │
│                                                  │
│  队列空时:                                       │
│  ├─ take()  → 阻塞,直到队列有元素                 │
│  └─ poll()  → 返回 null(不阻塞)                 │
│                                                  │
│  ┌─────────────────────────────────────────┐   │
│  │  生产者:queue.put(item)                 │   │
│  │       ↓                                  │   │
│  │  [█████][空位]                          │   │
│  │       ↓                                  │   │
│  │  消费者:queue.take()                    │   │
│  └─────────────────────────────────────────┘   │
│                                                  │
└──────────────────────────────────────────────────┘

方法对照

操作抛异常阻塞超时返回返回值
入队add(e)put(e)offer(e, time, unit)true/false
出队remove()take()poll(time, unit)null/E
查看element()-peek()null/E

队列家族

BlockingQueue
├── ArrayBlockingQueue(数组,有界 FIFO)
├── LinkedBlockingQueue(链表,可选有界)
├── PriorityBlockingQueue(优先级,无界)
├── DelayQueue(延迟元素)
├── SynchronousQueue(不存储元素)
└── LinkedTransferQueue(可转移)

实现继承:BlockingQueue extends Queue

ArrayBlockingQueue

固定大小的有界队列:

java
public class ArrayBlockingQueueDemo {

    public static void main(String[] args) throws InterruptedException {
        // 创建容量为 3 的有界队列
        BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(3);

        // put:队列满时阻塞
        queue.put(1);
        queue.put(2);
        queue.put(3);
        System.out.println("队列已满");

        // offer:队列满时返回 false
        boolean added = queue.offer(4);
        System.out.println("offer(4) 结果: " + added);  // false

        // offer 带超时
        added = queue.offer(5, 1, TimeUnit.SECONDS);
        System.out.println("offer(5, 1s) 结果: " + added);  // false

        // take:队列空时阻塞
        Integer value = queue.take();
        System.out.println("取出: " + value);
    }
}

LinkedBlockingQueue

可选有界(默认 Integer.MAX_VALUE)的链表队列:

java
public class LinkedBlockingQueueDemo {

    public static void main(String[] args) throws InterruptedException {
        // 无界(实际上有上限)
        BlockingQueue<String> unbounded = new LinkedBlockingQueue<>();

        // 显式指定容量(有界)
        BlockingQueue<String> bounded = new LinkedBlockingQueue<>(100);

        // 生产者-消费者示例
        BlockingQueue<String> queue = new LinkedBlockingQueue<>(10);

        // 生产者
        Thread producer = new Thread(() -> {
            for (int i = 1; i <= 10; i++) {
                try {
                    queue.put("消息-" + i);
                    System.out.println("生产: 消息-" + i);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }, "Producer");

        // 消费者
        Thread consumer = new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                try {
                    String msg = queue.take();
                    System.out.println("消费: " + msg);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }, "Consumer");

        producer.start();
        Thread.sleep(100);  // 确保生产者先开始
        consumer.start();
    }
}

LinkedBlockingQueue vs ArrayBlockingQueue

特性ArrayBlockingQueueLinkedBlockingQueue
底层结构数组链表
容量必须指定可选,默认 Integer.MAX_VALUE
一把锁两把锁(入队/出队分离)
吞吐量较低较高
内存固定动态分配

选择建议:LinkedBlockingQueue 通常性能更好,除非需要严格的队列大小限制。

PriorityBlockingQueue

基于堆的优先级队列,无界:

java
public class PriorityBlockingQueueDemo {

    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<Task> queue = new PriorityBlockingQueue<>();

        queue.put(new Task("紧急", 1));
        queue.put(new Task("普通", 3));
        queue.put(new Task("重要", 2));

        // 按优先级出队
        while (!queue.isEmpty()) {
            Task task = queue.take();
            System.out.println("执行: " + task.name + " (优先级:" + task.priority + ")");
        }
    }

    static class Task implements Comparable<Task> {
        String name;
        int priority;

        Task(String name, int priority) {
            this.name = name;
            this.priority = priority;
        }

        @Override
        public int compareTo(Task o) {
            return Integer.compare(this.priority, o.priority);  // 数字越小优先级越高
        }
    }
}

SynchronousQueue

不存储元素的队列,每个 put 必须等一个 take:

java
public class SynchronousQueueDemo {

    public static void main(String[] args) throws InterruptedException {
        BlockingQueue<Integer> queue = new SynchronousQueue<>();

        // 生产者
        Thread producer = new Thread(() -> {
            try {
                System.out.println("生产者: 等待消费者...");
                queue.put(42);  // 阻塞,直到有消费者 take
                System.out.println("生产者: 发送完成");
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, "Producer");

        // 消费者
        Thread consumer = new Thread(() -> {
            try {
                Thread.sleep(1000);
                System.out.println("消费者: 准备接收...");
                Integer value = queue.take();
                System.out.println("消费者: 收到 " + value);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, "Consumer");

        producer.start();
        consumer.start();
    }
}

SynchronousQueue 的特点:

  • 不存储元素
  • 每个 put 必须等一个 take,反之亦然
  • 适合线程间直接交接

经典模式:生产者-消费者

java
public class ProducerConsumerPattern {

    private static final BlockingQueue<Task> taskQueue =
        new LinkedBlockingQueue<>(100);

    public static void main(String[] args) {
        // 启动 5 个消费者
        for (int i = 0; i < 5; i++) {
            final int id = i;
            new Thread(() -> {
                while (true) {
                    try {
                        Task task = taskQueue.take();  // 队列空时阻塞
                        System.out.println("消费者" + id + " 处理: " + task);
                        Thread.sleep(500);  // 模拟处理
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        break;
                    }
                }
            }).start();
        }

        // 模拟不断产生任务
        new Thread(() -> {
            for (int i = 0; i < 20; i++) {
                try {
                    taskQueue.put(new Task("任务-" + i, i % 3));
                    System.out.println("生产者: 添加任务-" + i);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }).start();
    }

    static class Task {
        String name;
        int priority;
        Task(String name, int priority) {
            this.name = name;
            this.priority = priority;
        }
        @Override
        public String toString() { return name; }
    }
}

注意事项

  1. 队列满时 put() 阻塞:如果不需要阻塞,用 offer()
  2. 队列空时 take() 阻塞:如果不需要阻塞,用 poll()
  3. 避免 offer()/poll() 与 put()/take() 混用
    • 混用可能导致死锁或超时失效
  4. LinkedBlockingQueue 默认无界:不限制大小可能导致 OOM

要点回顾

  • BlockingQueue 提供阻塞的入队/出队操作
  • put()/take() 阻塞等待,offer()/poll() 不阻塞
  • ArrayBlockingQueue:有界,性能一般
  • LinkedBlockingQueue:可选有界,性能更好
  • PriorityBlockingQueue:按优先级排序
  • SynchronousQueue:不存储元素,直接交接

相关链接

基于 VitePress 构建