Phaser
CountDownLatch 只能一次性使用,CyclicBarrier 虽然可重置但参与者数量固定。
如果你的场景是:参与者数量会变化,且需要多阶段同步,那 Phaser 就是为你准备的。
核心特性
Phaser(阶段管理器)是 Java 7 引入的同步工具:
- 动态注册:可以在运行时增减参与者
- 多阶段同步:支持多个阶段的同步操作
- 相位转换:每个阶段完成后自动进入下一阶段
- 可终止:支持主动终止
对比家族成员
| 工具 | 特点 | 可重用 | 动态参与者 |
|---|---|---|---|
| CountDownLatch | 一等多 | 否 | 否 |
| CyclicBarrier | 多互等 | 是 | 否 |
| Phaser | 多阶段动态同步 | 是 | 是 |
代码演示
基础用法
java
public class PhaserDemo {
public static void main(String[] args) throws InterruptedException {
int partyCount = 3;
Phaser phaser = new Phaser(partyCount);
System.out.println("开始多阶段任务\n");
for (int i = 0; i < partyCount; i++) {
final int threadId = i;
new Thread(() -> {
try {
// 第一阶段
System.out.println("线程 " + threadId + " 执行第一阶段");
Thread.sleep((long) (Math.random() * 500 + 200));
System.out.println("线程 " + threadId + " 完成第一阶段,等待其他线程");
phaser.arriveAndAwaitAdvance();
// 第二阶段
System.out.println("线程 " + threadId + " 执行第二阶段");
Thread.sleep((long) (Math.random() * 500 + 200));
System.out.println("线程 " + threadId + " 完成第二阶段,等待其他线程");
phaser.arriveAndAwaitAdvance();
// 第三阶段
System.out.println("线程 " + threadId + " 执行第三阶段");
Thread.sleep((long) (Math.random() * 300 + 100));
System.out.println("线程 " + threadId + " 完成所有阶段");
phaser.arriveAndDeregister(); // 退出
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
Thread.sleep(5000);
System.out.println("\n所有阶段完成,Phaser 终止");
}
}动态注册参与者
java
public class PhaserDynamicRegistration {
public static void main(String[] args) throws InterruptedException {
Phaser phaser = new Phaser();
// 批量注册
int initialParties = 2;
phaser.bulkRegister(initialParties);
System.out.println("初始参与者数: " + phaser.getRegisteredParties());
// 动态添加
phaser.register();
System.out.println("注册后参与者数: " + phaser.getRegisteredParties());
for (int i = 0; i < phaser.getRegisteredParties(); i++) {
final int threadId = i;
new Thread(() -> {
System.out.println("线程 " + threadId + " 开始执行");
phaser.arriveAndAwaitAdvance();
System.out.println("线程 " + threadId + " 到达屏障");
}).start();
}
Thread.sleep(1000);
System.out.println("第一阶段完成,相位: " + phaser.getPhase());
// 取消注册
phaser.arriveAndDeregister();
System.out.println("取消注册后参与者数: " + phaser.getRegisteredParties());
}
}带回调的多阶段
Phaser 允许我们重写 onAdvance() 方法,在每个阶段完成后执行自定义逻辑:
java
public class PhaserWithCallback {
public static void main(String[] args) throws InterruptedException {
int partyCount = 3;
Phaser phaser = new Phaser(partyCount) {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println("\n=== 第 " + phase + " 阶段完成回调 ===");
System.out.println("当前参与者: " + registeredParties);
// 返回 true 终止 Phaser
return phase >= 2 || registeredParties == 0;
}
};
for (int i = 0; i < partyCount; i++) {
final int threadId = i;
new Thread(() -> {
for (int phase = 0; phase < 3; phase++) {
try {
System.out.println("线程 " + threadId + " 执行阶段 " + phase);
Thread.sleep((long) (Math.random() * 300 + 100));
phaser.arriveAndAwaitAdvance();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
phaser.arriveAndDeregister();
}).start();
}
// 等待 Phaser 终止
while (!phaser.isTerminated()) {
Thread.sleep(100);
}
System.out.println("\nPhaser 已终止");
}
}实际应用:并行测试框架
java
public class ParallelTestPhaser {
public static void main(String[] args) throws InterruptedException {
String[] testCases = {"登录测试", "注册测试", "下单测试"};
Phaser phaser = new Phaser(3);
System.out.println("=== 并行测试开始 ===\n");
for (String testCase : testCases) {
new Thread(new TestCase(testCase, phaser), testCase).start();
}
Thread.sleep(5000);
System.out.println("\n=== 所有测试完成 ===");
}
static class TestCase implements Runnable {
private final String name;
private final Phaser phaser;
TestCase(String name, Phaser phaser) {
this.name = name;
this.phaser = phaser;
}
@Override
public void run() {
try {
// 阶段1: Setup
System.out.println("[" + name + "] Setup 开始");
Thread.sleep((long) (Math.random() * 200 + 100));
System.out.println("[" + name + "] Setup 完成");
phaser.arriveAndAwaitAdvance();
// 阶段2: Execute
System.out.println("[" + name + "] Execute 开始");
Thread.sleep((long) (Math.random() * 300 + 200));
System.out.println("[" + name + "] Execute 完成");
phaser.arriveAndAwaitAdvance();
// 阶段3: Teardown
System.out.println("[" + name + "] Teardown 开始");
Thread.sleep((long) (Math.random() * 100 + 50));
System.out.println("[" + name + "] Teardown 完成");
phaser.arriveAndAwaitAdvance();
phaser.arriveAndDeregister();
System.out.println("[" + name + "] 测试结束");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}等待特定阶段
java
public class PhaserWaitForPhase {
public static void main(String[] args) throws InterruptedException {
Phaser phaser = new Phaser(2);
Thread worker = new Thread(() -> {
for (int i = 0; i < 3; i++) {
System.out.println("Worker 执行阶段 " + i);
try {
Thread.sleep(300);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
phaser.arrive();
}
}, "Worker");
worker.start();
// 等待第一阶段完成
phaser.awaitAdvance(phaser.getPhase());
System.out.println("主线程: 第一阶段完成");
// 等待第二阶段完成
phaser.awaitAdvance(phaser.getPhase());
System.out.println("主线程: 第二阶段完成");
worker.join();
System.out.println("主线程: 所有阶段完成");
}
}方法一览
| 方法 | 说明 |
|---|---|
Phaser() | 创建无参与者的 Phaser |
Phaser(int parties) | 创建指定初始参与者的 Phaser |
register() | 注册一个新参与者 |
arrive() | 到达但不等待 |
arriveAndAwaitAdvance() | 到达并等待其他参与者 |
arriveAndDeregister() | 到达并退出 Phaser |
bulkRegister(int parties) | 批量注册 |
awaitAdvance(int phase) | 等待到达指定相位 |
onAdvance(phase, parties) | 阶段完成回调(可重写) |
isTerminated() | 检查是否已终止 |
注意事项
- 动态管理:支持运行时注册和取消注册,比 CyclicBarrier 更灵活
- onAdvance() 回调:重写此方法可以在每个阶段完成后执行自定义逻辑
- Termination 状态:当 onAdvance() 返回 true 或所有参与者都取消注册后,Phaser 进入终止状态
- 异常处理:等待中的线程被中断会抛出 InterruptedException
要点回顾
Phaser 适合需要多阶段同步且参与者数量可能动态变化的场景:
- 并行测试框架(setup → execute → teardown)
- 多阶段数据处理流水线
- 动态并行任务
记住:可重置、可动态增减参与者,是 CyclicBarrier 的增强版。
