阻塞队列
生产者太快,消费者太慢怎么办?
这是并发编程中的经典问题。BlockingQueue 提供了阻塞的入队和出队操作,让生产者和消费者自动协调速度。
什么是阻塞队列
┌──────────────────────────────────────────────────┐
│ BlockingQueue 特性 │
├──────────────────────────────────────────────────┤
│ │
│ 队列满时: │
│ ├─ put(e) → 阻塞,直到队列有空位 │
│ └─ offer() → 返回 false(不阻塞) │
│ │
│ 队列空时: │
│ ├─ take() → 阻塞,直到队列有元素 │
│ └─ poll() → 返回 null(不阻塞) │
│ │
│ ┌─────────────────────────────────────────┐ │
│ │ 生产者:queue.put(item) │ │
│ │ ↓ │ │
│ │ [█████][空位] │ │
│ │ ↓ │ │
│ │ 消费者:queue.take() │ │
│ └─────────────────────────────────────────┘ │
│ │
└──────────────────────────────────────────────────┘方法对照
| 操作 | 抛异常 | 阻塞 | 超时返回 | 返回值 |
|---|---|---|---|---|
| 入队 | add(e) | put(e) | offer(e, time, unit) | true/false |
| 出队 | remove() | take() | poll(time, unit) | null/E |
| 查看 | element() | - | peek() | null/E |
队列家族
BlockingQueue
├── ArrayBlockingQueue(数组,有界 FIFO)
├── LinkedBlockingQueue(链表,可选有界)
├── PriorityBlockingQueue(优先级,无界)
├── DelayQueue(延迟元素)
├── SynchronousQueue(不存储元素)
└── LinkedTransferQueue(可转移)
实现继承:BlockingQueue extends QueueArrayBlockingQueue
固定大小的有界队列:
java
public class ArrayBlockingQueueDemo {
public static void main(String[] args) throws InterruptedException {
// 创建容量为 3 的有界队列
BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(3);
// put:队列满时阻塞
queue.put(1);
queue.put(2);
queue.put(3);
System.out.println("队列已满");
// offer:队列满时返回 false
boolean added = queue.offer(4);
System.out.println("offer(4) 结果: " + added); // false
// offer 带超时
added = queue.offer(5, 1, TimeUnit.SECONDS);
System.out.println("offer(5, 1s) 结果: " + added); // false
// take:队列空时阻塞
Integer value = queue.take();
System.out.println("取出: " + value);
}
}LinkedBlockingQueue
可选有界(默认 Integer.MAX_VALUE)的链表队列:
java
public class LinkedBlockingQueueDemo {
public static void main(String[] args) throws InterruptedException {
// 无界(实际上有上限)
BlockingQueue<String> unbounded = new LinkedBlockingQueue<>();
// 显式指定容量(有界)
BlockingQueue<String> bounded = new LinkedBlockingQueue<>(100);
// 生产者-消费者示例
BlockingQueue<String> queue = new LinkedBlockingQueue<>(10);
// 生产者
Thread producer = new Thread(() -> {
for (int i = 1; i <= 10; i++) {
try {
queue.put("消息-" + i);
System.out.println("生产: 消息-" + i);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}, "Producer");
// 消费者
Thread consumer = new Thread(() -> {
for (int i = 0; i < 10; i++) {
try {
String msg = queue.take();
System.out.println("消费: " + msg);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}, "Consumer");
producer.start();
Thread.sleep(100); // 确保生产者先开始
consumer.start();
}
}LinkedBlockingQueue vs ArrayBlockingQueue
| 特性 | ArrayBlockingQueue | LinkedBlockingQueue |
|---|---|---|
| 底层结构 | 数组 | 链表 |
| 容量 | 必须指定 | 可选,默认 Integer.MAX_VALUE |
| 锁 | 一把锁 | 两把锁(入队/出队分离) |
| 吞吐量 | 较低 | 较高 |
| 内存 | 固定 | 动态分配 |
选择建议:LinkedBlockingQueue 通常性能更好,除非需要严格的队列大小限制。
PriorityBlockingQueue
基于堆的优先级队列,无界:
java
public class PriorityBlockingQueueDemo {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Task> queue = new PriorityBlockingQueue<>();
queue.put(new Task("紧急", 1));
queue.put(new Task("普通", 3));
queue.put(new Task("重要", 2));
// 按优先级出队
while (!queue.isEmpty()) {
Task task = queue.take();
System.out.println("执行: " + task.name + " (优先级:" + task.priority + ")");
}
}
static class Task implements Comparable<Task> {
String name;
int priority;
Task(String name, int priority) {
this.name = name;
this.priority = priority;
}
@Override
public int compareTo(Task o) {
return Integer.compare(this.priority, o.priority); // 数字越小优先级越高
}
}
}SynchronousQueue
不存储元素的队列,每个 put 必须等一个 take:
java
public class SynchronousQueueDemo {
public static void main(String[] args) throws InterruptedException {
BlockingQueue<Integer> queue = new SynchronousQueue<>();
// 生产者
Thread producer = new Thread(() -> {
try {
System.out.println("生产者: 等待消费者...");
queue.put(42); // 阻塞,直到有消费者 take
System.out.println("生产者: 发送完成");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "Producer");
// 消费者
Thread consumer = new Thread(() -> {
try {
Thread.sleep(1000);
System.out.println("消费者: 准备接收...");
Integer value = queue.take();
System.out.println("消费者: 收到 " + value);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "Consumer");
producer.start();
consumer.start();
}
}SynchronousQueue 的特点:
- 不存储元素
- 每个 put 必须等一个 take,反之亦然
- 适合线程间直接交接
经典模式:生产者-消费者
java
public class ProducerConsumerPattern {
private static final BlockingQueue<Task> taskQueue =
new LinkedBlockingQueue<>(100);
public static void main(String[] args) {
// 启动 5 个消费者
for (int i = 0; i < 5; i++) {
final int id = i;
new Thread(() -> {
while (true) {
try {
Task task = taskQueue.take(); // 队列空时阻塞
System.out.println("消费者" + id + " 处理: " + task);
Thread.sleep(500); // 模拟处理
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}).start();
}
// 模拟不断产生任务
new Thread(() -> {
for (int i = 0; i < 20; i++) {
try {
taskQueue.put(new Task("任务-" + i, i % 3));
System.out.println("生产者: 添加任务-" + i);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}).start();
}
static class Task {
String name;
int priority;
Task(String name, int priority) {
this.name = name;
this.priority = priority;
}
@Override
public String toString() { return name; }
}
}注意事项
- 队列满时 put() 阻塞:如果不需要阻塞,用 offer()
- 队列空时 take() 阻塞:如果不需要阻塞,用 poll()
- 避免 offer()/poll() 与 put()/take() 混用:
- 混用可能导致死锁或超时失效
- LinkedBlockingQueue 默认无界:不限制大小可能导致 OOM
要点回顾
- BlockingQueue 提供阻塞的入队/出队操作
- put()/take() 阻塞等待,offer()/poll() 不阻塞
- ArrayBlockingQueue:有界,性能一般
- LinkedBlockingQueue:可选有界,性能更好
- PriorityBlockingQueue:按优先级排序
- SynchronousQueue:不存储元素,直接交接
相关链接
- ConcurrentQueue - 非阻塞队列
- DelayQueue - 延迟队列
- JUC 集合选择 - 根据场景选择集合
