Skip to content

Phaser

CountDownLatch 只能一次性使用,CyclicBarrier 虽然可重置但参与者数量固定。

如果你的场景是:参与者数量会变化,且需要多阶段同步,那 Phaser 就是为你准备的。

核心特性

Phaser(阶段管理器)是 Java 7 引入的同步工具:

  1. 动态注册:可以在运行时增减参与者
  2. 多阶段同步:支持多个阶段的同步操作
  3. 相位转换:每个阶段完成后自动进入下一阶段
  4. 可终止:支持主动终止

对比家族成员

工具特点可重用动态参与者
CountDownLatch一等多
CyclicBarrier多互等
Phaser多阶段动态同步

代码演示

基础用法

java
public class PhaserDemo {

    public static void main(String[] args) throws InterruptedException {
        int partyCount = 3;
        Phaser phaser = new Phaser(partyCount);

        System.out.println("开始多阶段任务\n");

        for (int i = 0; i < partyCount; i++) {
            final int threadId = i;
            new Thread(() -> {
                try {
                    // 第一阶段
                    System.out.println("线程 " + threadId + " 执行第一阶段");
                    Thread.sleep((long) (Math.random() * 500 + 200));
                    System.out.println("线程 " + threadId + " 完成第一阶段,等待其他线程");
                    phaser.arriveAndAwaitAdvance();

                    // 第二阶段
                    System.out.println("线程 " + threadId + " 执行第二阶段");
                    Thread.sleep((long) (Math.random() * 500 + 200));
                    System.out.println("线程 " + threadId + " 完成第二阶段,等待其他线程");
                    phaser.arriveAndAwaitAdvance();

                    // 第三阶段
                    System.out.println("线程 " + threadId + " 执行第三阶段");
                    Thread.sleep((long) (Math.random() * 300 + 100));
                    System.out.println("线程 " + threadId + " 完成所有阶段");

                    phaser.arriveAndDeregister();  // 退出

                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }).start();
        }

        Thread.sleep(5000);
        System.out.println("\n所有阶段完成,Phaser 终止");
    }
}

动态注册参与者

java
public class PhaserDynamicRegistration {

    public static void main(String[] args) throws InterruptedException {
        Phaser phaser = new Phaser();

        // 批量注册
        int initialParties = 2;
        phaser.bulkRegister(initialParties);
        System.out.println("初始参与者数: " + phaser.getRegisteredParties());

        // 动态添加
        phaser.register();
        System.out.println("注册后参与者数: " + phaser.getRegisteredParties());

        for (int i = 0; i < phaser.getRegisteredParties(); i++) {
            final int threadId = i;
            new Thread(() -> {
                System.out.println("线程 " + threadId + " 开始执行");
                phaser.arriveAndAwaitAdvance();
                System.out.println("线程 " + threadId + " 到达屏障");
            }).start();
        }

        Thread.sleep(1000);
        System.out.println("第一阶段完成,相位: " + phaser.getPhase());

        // 取消注册
        phaser.arriveAndDeregister();
        System.out.println("取消注册后参与者数: " + phaser.getRegisteredParties());
    }
}

带回调的多阶段

Phaser 允许我们重写 onAdvance() 方法,在每个阶段完成后执行自定义逻辑:

java
public class PhaserWithCallback {

    public static void main(String[] args) throws InterruptedException {
        int partyCount = 3;
        Phaser phaser = new Phaser(partyCount) {
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                System.out.println("\n=== 第 " + phase + " 阶段完成回调 ===");
                System.out.println("当前参与者: " + registeredParties);
                // 返回 true 终止 Phaser
                return phase >= 2 || registeredParties == 0;
            }
        };

        for (int i = 0; i < partyCount; i++) {
            final int threadId = i;
            new Thread(() -> {
                for (int phase = 0; phase < 3; phase++) {
                    try {
                        System.out.println("线程 " + threadId + " 执行阶段 " + phase);
                        Thread.sleep((long) (Math.random() * 300 + 100));
                        phaser.arriveAndAwaitAdvance();
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        return;
                    }
                }
                phaser.arriveAndDeregister();
            }).start();
        }

        // 等待 Phaser 终止
        while (!phaser.isTerminated()) {
            Thread.sleep(100);
        }
        System.out.println("\nPhaser 已终止");
    }
}

实际应用:并行测试框架

java
public class ParallelTestPhaser {

    public static void main(String[] args) throws InterruptedException {
        String[] testCases = {"登录测试", "注册测试", "下单测试"};
        Phaser phaser = new Phaser(3);

        System.out.println("=== 并行测试开始 ===\n");

        for (String testCase : testCases) {
            new Thread(new TestCase(testCase, phaser), testCase).start();
        }

        Thread.sleep(5000);
        System.out.println("\n=== 所有测试完成 ===");
    }

    static class TestCase implements Runnable {
        private final String name;
        private final Phaser phaser;

        TestCase(String name, Phaser phaser) {
            this.name = name;
            this.phaser = phaser;
        }

        @Override
        public void run() {
            try {
                // 阶段1: Setup
                System.out.println("[" + name + "] Setup 开始");
                Thread.sleep((long) (Math.random() * 200 + 100));
                System.out.println("[" + name + "] Setup 完成");
                phaser.arriveAndAwaitAdvance();

                // 阶段2: Execute
                System.out.println("[" + name + "] Execute 开始");
                Thread.sleep((long) (Math.random() * 300 + 200));
                System.out.println("[" + name + "] Execute 完成");
                phaser.arriveAndAwaitAdvance();

                // 阶段3: Teardown
                System.out.println("[" + name + "] Teardown 开始");
                Thread.sleep((long) (Math.random() * 100 + 50));
                System.out.println("[" + name + "] Teardown 完成");
                phaser.arriveAndAwaitAdvance();

                phaser.arriveAndDeregister();
                System.out.println("[" + name + "] 测试结束");

            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }
}

等待特定阶段

java
public class PhaserWaitForPhase {

    public static void main(String[] args) throws InterruptedException {
        Phaser phaser = new Phaser(2);

        Thread worker = new Thread(() -> {
            for (int i = 0; i < 3; i++) {
                System.out.println("Worker 执行阶段 " + i);
                try {
                    Thread.sleep(300);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    return;
                }
                phaser.arrive();
            }
        }, "Worker");

        worker.start();

        // 等待第一阶段完成
        phaser.awaitAdvance(phaser.getPhase());
        System.out.println("主线程: 第一阶段完成");

        // 等待第二阶段完成
        phaser.awaitAdvance(phaser.getPhase());
        System.out.println("主线程: 第二阶段完成");

        worker.join();
        System.out.println("主线程: 所有阶段完成");
    }
}

方法一览

方法说明
Phaser()创建无参与者的 Phaser
Phaser(int parties)创建指定初始参与者的 Phaser
register()注册一个新参与者
arrive()到达但不等待
arriveAndAwaitAdvance()到达并等待其他参与者
arriveAndDeregister()到达并退出 Phaser
bulkRegister(int parties)批量注册
awaitAdvance(int phase)等待到达指定相位
onAdvance(phase, parties)阶段完成回调(可重写)
isTerminated()检查是否已终止

注意事项

  1. 动态管理:支持运行时注册和取消注册,比 CyclicBarrier 更灵活
  2. onAdvance() 回调:重写此方法可以在每个阶段完成后执行自定义逻辑
  3. Termination 状态:当 onAdvance() 返回 true 或所有参与者都取消注册后,Phaser 进入终止状态
  4. 异常处理:等待中的线程被中断会抛出 InterruptedException

要点回顾

Phaser 适合需要多阶段同步且参与者数量可能动态变化的场景:

  • 并行测试框架(setup → execute → teardown)
  • 多阶段数据处理流水线
  • 动态并行任务

记住:可重置、可动态增减参与者,是 CyclicBarrier 的增强版。

基于 VitePress 构建