ConcurrentQueue
不需要阻塞操作的并发队列?
ConcurrentLinkedQueue 和 ConcurrentLinkedDeque 是高性能的无锁队列,适用于高并发场景。
队列家族
┌──────────────────────────────────────────────────┐
│ JUC 队列分类 │
├──────────────────────────────────────────────────┤
│ │
│ Queue<E> │
│ ├── BlockingQueue<E> │
│ │ ├── ArrayBlockingQueue(有界 FIFO) │
│ │ ├── LinkedBlockingQueue(可选有界) │
│ │ ├── PriorityBlockingQueue(优先级) │
│ │ ├── DelayQueue(延迟) │
│ │ └── SynchronousQueue(不存储元素) │
│ │ │
│ └── ConcurrentLinkedQueue<E>(无界 FIFO) │
│ └── ConcurrentLinkedDeque<E>(无界双端) │
│ │
└──────────────────────────────────────────────────┘BlockingQueue 支持阻塞操作(put/take),ConcurrentQueue 不支持阻塞。
ConcurrentLinkedQueue
高性能的无界 FIFO 队列,使用 CAS 实现无锁:
java
public class ConcurrentQueueDemo {
private static final ConcurrentLinkedQueue<String> queue =
new ConcurrentLinkedQueue<>();
public static void main(String[] args) {
// 入队(add/offer 等价)
queue.offer("元素1");
queue.offer("元素2");
queue.add("元素3"); // 等价于 offer
System.out.println("队列大小: " + queue.size());
// 出队(非阻塞)
String elem = queue.poll(); // null(队列空时)
System.out.println("取出: " + elem);
// 查看队首(非阻塞)
String peek = queue.peek(); // null(队列空时)
System.out.println("查看: " + peek);
}
}方法对照
| 操作 | 抛异常 | 返回特殊值 | 阻塞 | 超时 |
|---|---|---|---|---|
| 入队 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
| 出队 | remove() | poll() | take() | poll(time, unit) |
| 查看 | element() | peek() | - | - |
ConcurrentLinkedQueue 不支持阻塞操作(put/take),只能用 offer/poll。
多线程安全操作
java
public class MultiThreadQueueDemo {
private static final ConcurrentLinkedQueue<Integer> queue =
new ConcurrentLinkedQueue<>();
private static final int THREAD_COUNT = 10;
public static void main(String[] args) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(THREAD_COUNT * 2);
// 10 个生产者线程
for (int i = 0; i < THREAD_COUNT; i++) {
final int id = i;
new Thread(() -> {
for (int j = 0; j < 100; j++) {
queue.offer(id * 100 + j);
}
latch.countDown();
}).start();
}
// 10 个消费者线程
for (int i = 0; i < THREAD_COUNT; i++) {
new Thread(() -> {
while (!queue.isEmpty() || latch.getCount() > 0) {
Integer value = queue.poll();
if (value != null) {
// 处理元素
}
}
latch.countDown();
}).start();
}
latch.await();
System.out.println("最终队列大小: " + queue.size());
}
}ConcurrentLinkedDeque
双端队列,支持从两端入队/出队:
java
public class DequeDemo {
private static final ConcurrentLinkedDeque<String> deque =
new ConcurrentLinkedDeque<>();
public static void main(String[] args) {
// 从头部操作
deque.addFirst("A");
deque.addFirst("B");
System.out.println("addFirst 后: " + deque); // [B, A]
// 从尾部操作
deque.addLast("C");
deque.addLast("D");
System.out.println("addLast 后: " + deque); // [B, A, C, D]
// 从头部取出
String first = deque.removeFirst();
System.out.println("removeFirst: " + first); // B
// 从尾部取出
String last = deque.removeLast();
System.out.println("removeLast: " + last); // D
}
}与 BlockingQueue 对比
| 特性 | ConcurrentLinkedQueue | BlockingQueue |
|---|---|---|
| 阻塞操作 | ❌ 不支持 | ✅ 支持(put/take) |
| null 元素 | ❌ 不允许 | ❌ 不允许 |
| size() | 不精确 | 精确 |
| 迭代器 | 弱一致性 | 弱一致性 |
| 性能 | 极高(无锁) | 中等(有锁) |
| 适用场景 | 高性能非阻塞 | 生产者-消费者 |
性能特点
ConcurrentLinkedQueue 性能:
┌──────────────────────────────────────────────────┐
│ 适用场景: │
│ - 非阻塞生产者-消费者 │
│ - 高吞吐量需求 │
│ - 不需要等待队列有空/满 │
│ │
│ 性能优势: │
│ - 无锁算法,比 BlockingQueue 快 3-10 倍 │
│ - 无阻塞开销 │
│ - 适合纯并发操作 │
│ │
│ 性能劣势: │
│ - size() 不精确(需要遍历) │
│ - 无法等待队列为空/满 │
└──────────────────────────────────────────────────┘常见错误
错误一:在多线程中混用 iterator
java
// ❌ 危险:iterator 是弱一致的
ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
for (Integer item : queue) { // 迭代过程中修改可能看不到
// ...
}
// ✅ 正确:使用 poll 遍历
Integer item;
while ((item = queue.poll()) != null) {
// 处理每个元素
}错误二:依赖 size() 做判断
java
// ❌ 错误:size() 不精确
while (queue.size() > 0) {
Integer item = queue.poll();
// ...
}
// ✅ 正确:直接 poll 直到返回 null
while (queue.poll() != null) {
// ...
}错误三:offer 后立即 size() > 0
java
// ❌ 危险:size() 可能不同步
queue.offer("item");
if (queue.size() > 0) { // size() 可能还没更新
// ...
}
// ✅ 正确:用 poll 判断
if (queue.offer("item")) {
// offer 返回 true 表示成功
}适用场景
ConcurrentLinkedQueue 适合:
- 高性能非阻塞生产者-消费者
- 不需要等待队列为空/满的操作
- 消息队列、任务队列(无需阻塞的场景)
- 高并发计数
需要用 BlockingQueue 的场景:
- 需要 put/take 的阻塞语义
- 生产者需要等待队列不满
- 消费者需要等待队列不空
要点回顾
- ConcurrentLinkedQueue 是无锁无界 FIFO 队列
- 不支持阻塞操作(put/take)
- offer/poll 不阻塞,返回 true/false 或 null
- size() 不精确,迭代器弱一致
- 适合高性能非阻塞场景
相关链接
- 阻塞队列 - 支持阻塞操作的队列
- DelayQueue - 延迟队列
- JUC 集合选择 - 根据场景选择集合
