Skip to content

CyclicBarrier

CountDownLatch 是"一等多"。

但如果线程之间是互相等待呢?

CyclicBarrier 干这个。

核心思想

想象多人游戏开始前的场景:

玩家1 ──► 等待 ──► 等待 ──► ...
玩家2 ──► 等待 ──► 等待 ──► ...
玩家3 ──► 等待 ──► 等待 ──► ...

         所有人都到达 ──► 游戏开始!

每个玩家都要等待其他玩家,所有人到达后才能继续。

这就是 CyclicBarrier 的本质。

CyclicBarrier vs CountDownLatch

特性CyclicBarrierCountDownLatch
重置可重置不可重置
等待方向线程互相等待主线程等待子线程
触发时机达到 N 个 await()计数器减到 0
计数方向增加到 N减到 0

代码演示

基础用法

java
public class CyclicBarrierDemo {

    public static void main(String[] args) {
        int partyCount = 3;
        CyclicBarrier barrier = new CyclicBarrier(partyCount);

        System.out.println("开始多线程分阶段执行\n");

        for (int i = 0; i < partyCount; i++) {
            final int threadId = i;
            new Thread(() -> {
                try {
                    // 第一阶段
                    System.out.println("线程 " + threadId + " 执行第一阶段");
                    Thread.sleep((long) (Math.random() * 1000 + 500));
                    System.out.println("线程 " + threadId + " 到达屏障1,等待其他线程");
                    barrier.await();

                    // 第二阶段(所有线程同时开始)
                    System.out.println("线程 " + threadId + " 开始执行第二阶段");
                    Thread.sleep((long) (Math.random() * 1000 + 500));
                    System.out.println("线程 " + threadId + " 到达屏障2,等待其他线程");
                    barrier.await();

                    // 第三阶段
                    System.out.println("线程 " + threadId + " 开始执行第三阶段");
                    Thread.sleep((long) (Math.random() * 500 + 200));
                    System.out.println("线程 " + threadId + " 执行完毕");
                    barrier.await();

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

带屏障动作

java
public class CyclicBarrierWithAction {

    public static void main(String[] args) {
        int partyCount = 3;
        CyclicBarrier barrier = new CyclicBarrier(partyCount, () -> {
            System.out.println("\n>>> 屏障动作执行:所有线程已到达,准备进入下一阶段");
        });

        for (int i = 0; i < partyCount; i++) {
            final int threadId = i;
            new Thread(() -> {
                try {
                    // 阶段1:数据准备
                    System.out.println("线程 " + threadId + " 准备数据...");
                    Thread.sleep((long) (Math.random() * 500 + 300));
                    System.out.println("线程 " + threadId + " 数据准备完成");
                    barrier.await();

                    // 阶段2:数据处理
                    System.out.println("线程 " + threadId + " 处理数据...");
                    Thread.sleep((long) (Math.random() * 500 + 300));
                    System.out.println("线程 " + threadId + " 数据处理完成");
                    barrier.await();

                    // 阶段3:结果汇总
                    System.out.println("线程 " + threadId + " 汇总结果...");
                    Thread.sleep((long) (Math.random() * 300 + 100));
                    System.out.println("线程 " + threadId + " 汇总完成");
                    barrier.await();

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

实际应用:并行排序

java
public class ParallelSortDemo {

    public static void main(String[] args) throws InterruptedException {
        int[] array = {5, 2, 8, 1, 9, 3, 7, 4, 6};
        int threadCount = 3;
        int chunkSize = (int) Math.ceil((double) array.length / threadCount);

        CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {
            System.out.println("所有分片排序完成,合并结果");
        });

        int[][] chunks = new int[threadCount][];
        for (int i = 0; i < threadCount; i++) {
            int start = i * chunkSize;
            int end = Math.min(start + chunkSize, array.length);
            chunks[i] = Arrays.copyOfRange(array, start, end);
        }

        Thread[] threads = new Thread[threadCount];
        for (int i = 0; i < threadCount; i++) {
            final int[] chunk = chunks[i];
            final int threadId = i;

            threads[i] = new Thread(() -> {
                try {
                    // 排序阶段
                    System.out.println("线程 " + threadId + " 排序: " + Arrays.toString(chunk));
                    Arrays.sort(chunk);
                    System.out.println("线程 " + threadId + " 排序完成: " + Arrays.toString(chunk));
                    barrier.await();

                    // 合并阶段
                    System.out.println("线程 " + threadId + " 参与合并");

                } catch (Exception e) {
                    e.printStackTrace();
                }
            });
            threads[i].start();
        }

        for (Thread t : threads) {
            t.join();
        }

        // 合并结果
        int[] result = Arrays.stream(chunks)
                .flatMapToInt(Arrays::stream)
                .sorted()
                .toArray();
        System.out.println("\n最终排序结果: " + Arrays.toString(result));
    }
}

循环使用

java
public class CyclicBarrierCyclicUse {

    public static void main(String[] args) throws InterruptedException {
        int partyCount = 3;
        int cycleCount = 2;
        CyclicBarrier barrier = new CyclicBarrier(partyCount);

        for (int i = 0; i < partyCount; i++) {
            final int threadId = i;
            new Thread(() -> {
                try {
                    for (int cycle = 1; cycle <= cycleCount; cycle++) {
                        System.out.println("线程 " + threadId + " 完成第 " + cycle + " 轮");
                        Thread.sleep((long) (Math.random() * 300 + 100));
                        barrier.await();
                        System.out.println("线程 " + threadId + " 进入第 " + cycle + " 轮下一阶段");
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
}

方法一览

方法说明
CyclicBarrier(int parties)构造方法,指定参与线程数
CyclicBarrier(int parties, Runnable action)指定参与线程数和屏障动作
await()等待所有线程到达屏障
await(long timeout, TimeUnit)等待指定时间
reset()重置屏障到初始状态
getNumberWaiting()获取当前正在等待的线程数
isBroken()检查屏障是否被破坏

注意事项

  1. BrokenBarrierException:如果一个线程在等待时被中断,或 barrier 被 reset,其他等待的线程会抛出 BrokenBarrierException
  2. 屏障重置reset() 会导致正在等待的线程抛出 BrokenBarrierException
  3. 异常处理:等待中的线程如果被中断会抛出 InterruptedException
  4. 线程数量必须匹配:构造函数中的 party 数量应与实际调用 await() 的线程数匹配
java
// 处理 BrokenBarrierException
CyclicBarrier barrier = new CyclicBarrier(3);

for (int i = 0; i < 3; i++) {
    new Thread(() -> {
        try {
            barrier.await();
        } catch (BrokenBarrierException e) {
            System.out.println("屏障已损坏");
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }).start();
}

要点回顾

CyclicBarrier 适用于"多互等"的场景:

  • 多线程分阶段执行
  • 多人游戏准备开始
  • 并行计算合并结果

记住:可重置,可以循环使用。

基于 VitePress 构建