Skip to content

ConcurrentQueue

不需要阻塞操作的并发队列?

ConcurrentLinkedQueueConcurrentLinkedDeque 是高性能的无锁队列,适用于高并发场景。

队列家族

┌──────────────────────────────────────────────────┐
│                  JUC 队列分类                      │
├──────────────────────────────────────────────────┤
│                                                  │
│  Queue<E>                                        │
│  ├── BlockingQueue<E>                             │
│  │     ├── ArrayBlockingQueue(有界 FIFO)          │
│  │     ├── LinkedBlockingQueue(可选有界)          │
│  │     ├── PriorityBlockingQueue(优先级)          │
│  │     ├── DelayQueue(延迟)                       │
│  │     └── SynchronousQueue(不存储元素)            │
│  │                                              │
│  └── ConcurrentLinkedQueue<E>(无界 FIFO)         │
│      └── ConcurrentLinkedDeque<E>(无界双端)       │
│                                                  │
└──────────────────────────────────────────────────┘

BlockingQueue 支持阻塞操作(put/take),ConcurrentQueue 不支持阻塞。

ConcurrentLinkedQueue

高性能的无界 FIFO 队列,使用 CAS 实现无锁:

java
public class ConcurrentQueueDemo {

    private static final ConcurrentLinkedQueue<String> queue =
        new ConcurrentLinkedQueue<>();

    public static void main(String[] args) {
        // 入队(add/offer 等价)
        queue.offer("元素1");
        queue.offer("元素2");
        queue.add("元素3");  // 等价于 offer

        System.out.println("队列大小: " + queue.size());

        // 出队(非阻塞)
        String elem = queue.poll();  // null(队列空时)
        System.out.println("取出: " + elem);

        // 查看队首(非阻塞)
        String peek = queue.peek();  // null(队列空时)
        System.out.println("查看: " + peek);
    }
}

方法对照

操作抛异常返回特殊值阻塞超时
入队add(e)offer(e)put(e)offer(e, time, unit)
出队remove()poll()take()poll(time, unit)
查看element()peek()--

ConcurrentLinkedQueue 不支持阻塞操作(put/take),只能用 offer/poll。

多线程安全操作

java
public class MultiThreadQueueDemo {

    private static final ConcurrentLinkedQueue<Integer> queue =
        new ConcurrentLinkedQueue<>();
    private static final int THREAD_COUNT = 10;

    public static void main(String[] args) throws InterruptedException {
        CountDownLatch latch = new CountDownLatch(THREAD_COUNT * 2);

        // 10 个生产者线程
        for (int i = 0; i < THREAD_COUNT; i++) {
            final int id = i;
            new Thread(() -> {
                for (int j = 0; j < 100; j++) {
                    queue.offer(id * 100 + j);
                }
                latch.countDown();
            }).start();
        }

        // 10 个消费者线程
        for (int i = 0; i < THREAD_COUNT; i++) {
            new Thread(() -> {
                while (!queue.isEmpty() || latch.getCount() > 0) {
                    Integer value = queue.poll();
                    if (value != null) {
                        // 处理元素
                    }
                }
                latch.countDown();
            }).start();
        }

        latch.await();
        System.out.println("最终队列大小: " + queue.size());
    }
}

ConcurrentLinkedDeque

双端队列,支持从两端入队/出队:

java
public class DequeDemo {

    private static final ConcurrentLinkedDeque<String> deque =
        new ConcurrentLinkedDeque<>();

    public static void main(String[] args) {
        // 从头部操作
        deque.addFirst("A");
        deque.addFirst("B");
        System.out.println("addFirst 后: " + deque);  // [B, A]

        // 从尾部操作
        deque.addLast("C");
        deque.addLast("D");
        System.out.println("addLast 后: " + deque);  // [B, A, C, D]

        // 从头部取出
        String first = deque.removeFirst();
        System.out.println("removeFirst: " + first);  // B

        // 从尾部取出
        String last = deque.removeLast();
        System.out.println("removeLast: " + last);   // D
    }
}

与 BlockingQueue 对比

特性ConcurrentLinkedQueueBlockingQueue
阻塞操作❌ 不支持✅ 支持(put/take)
null 元素❌ 不允许❌ 不允许
size()不精确精确
迭代器弱一致性弱一致性
性能极高(无锁)中等(有锁)
适用场景高性能非阻塞生产者-消费者

性能特点

ConcurrentLinkedQueue 性能:
┌──────────────────────────────────────────────────┐
│  适用场景:                                        │
│  - 非阻塞生产者-消费者                             │
│  - 高吞吐量需求                                    │
│  - 不需要等待队列有空/满                           │
│                                                  │
│  性能优势:                                        │
│  - 无锁算法,比 BlockingQueue 快 3-10 倍           │
│  - 无阻塞开销                                      │
│  - 适合纯并发操作                                  │
│                                                  │
│  性能劣势:                                        │
│  - size() 不精确(需要遍历)                       │
│  - 无法等待队列为空/满                             │
└──────────────────────────────────────────────────┘

常见错误

错误一:在多线程中混用 iterator

java
// ❌ 危险:iterator 是弱一致的
ConcurrentLinkedQueue<Integer> queue = new ConcurrentLinkedQueue<>();
for (Integer item : queue) {  // 迭代过程中修改可能看不到
    // ...
}

// ✅ 正确:使用 poll 遍历
Integer item;
while ((item = queue.poll()) != null) {
    // 处理每个元素
}

错误二:依赖 size() 做判断

java
// ❌ 错误:size() 不精确
while (queue.size() > 0) {
    Integer item = queue.poll();
    // ...
}

// ✅ 正确:直接 poll 直到返回 null
while (queue.poll() != null) {
    // ...
}

错误三:offer 后立即 size() > 0

java
// ❌ 危险:size() 可能不同步
queue.offer("item");
if (queue.size() > 0) {  // size() 可能还没更新
    // ...
}

// ✅ 正确:用 poll 判断
if (queue.offer("item")) {
    // offer 返回 true 表示成功
}

适用场景

ConcurrentLinkedQueue 适合

  • 高性能非阻塞生产者-消费者
  • 不需要等待队列为空/满的操作
  • 消息队列、任务队列(无需阻塞的场景)
  • 高并发计数

需要用 BlockingQueue 的场景

  • 需要 put/take 的阻塞语义
  • 生产者需要等待队列不满
  • 消费者需要等待队列不空

要点回顾

  • ConcurrentLinkedQueue 是无锁无界 FIFO 队列
  • 不支持阻塞操作(put/take)
  • offer/poll 不阻塞,返回 true/false 或 null
  • size() 不精确,迭代器弱一致
  • 适合高性能非阻塞场景

相关链接

基于 VitePress 构建