CyclicBarrier
CountDownLatch 是"一等多"。
但如果线程之间是互相等待呢?
CyclicBarrier 干这个。
核心思想
想象多人游戏开始前的场景:
玩家1 ──► 等待 ──► 等待 ──► ...
玩家2 ──► 等待 ──► 等待 ──► ...
玩家3 ──► 等待 ──► 等待 ──► ...
│
所有人都到达 ──► 游戏开始!1
2
3
4
5
2
3
4
5
每个玩家都要等待其他玩家,所有人到达后才能继续。
这就是 CyclicBarrier 的本质。
CyclicBarrier vs CountDownLatch
| 特性 | CyclicBarrier | CountDownLatch |
|---|---|---|
| 重置 | 可重置 | 不可重置 |
| 等待方向 | 线程互相等待 | 主线程等待子线程 |
| 触发时机 | 达到 N 个 await() | 计数器减到 0 |
| 计数方向 | 增加到 N | 减到 0 |
代码演示
基础用法
java
public class CyclicBarrierDemo {
public static void main(String[] args) {
int partyCount = 3;
CyclicBarrier barrier = new CyclicBarrier(partyCount);
System.out.println("开始多线程分阶段执行\n");
for (int i = 0; i < partyCount; i++) {
final int threadId = i;
new Thread(() -> {
try {
// 第一阶段
System.out.println("线程 " + threadId + " 执行第一阶段");
Thread.sleep((long) (Math.random() * 1000 + 500));
System.out.println("线程 " + threadId + " 到达屏障1,等待其他线程");
barrier.await();
// 第二阶段(所有线程同时开始)
System.out.println("线程 " + threadId + " 开始执行第二阶段");
Thread.sleep((long) (Math.random() * 1000 + 500));
System.out.println("线程 " + threadId + " 到达屏障2,等待其他线程");
barrier.await();
// 第三阶段
System.out.println("线程 " + threadId + " 开始执行第三阶段");
Thread.sleep((long) (Math.random() * 500 + 200));
System.out.println("线程 " + threadId + " 执行完毕");
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
带屏障动作
java
public class CyclicBarrierWithAction {
public static void main(String[] args) {
int partyCount = 3;
CyclicBarrier barrier = new CyclicBarrier(partyCount, () -> {
System.out.println("\n>>> 屏障动作执行:所有线程已到达,准备进入下一阶段");
});
for (int i = 0; i < partyCount; i++) {
final int threadId = i;
new Thread(() -> {
try {
// 阶段1:数据准备
System.out.println("线程 " + threadId + " 准备数据...");
Thread.sleep((long) (Math.random() * 500 + 300));
System.out.println("线程 " + threadId + " 数据准备完成");
barrier.await();
// 阶段2:数据处理
System.out.println("线程 " + threadId + " 处理数据...");
Thread.sleep((long) (Math.random() * 500 + 300));
System.out.println("线程 " + threadId + " 数据处理完成");
barrier.await();
// 阶段3:结果汇总
System.out.println("线程 " + threadId + " 汇总结果...");
Thread.sleep((long) (Math.random() * 300 + 100));
System.out.println("线程 " + threadId + " 汇总完成");
barrier.await();
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
实际应用:并行排序
java
public class ParallelSortDemo {
public static void main(String[] args) throws InterruptedException {
int[] array = {5, 2, 8, 1, 9, 3, 7, 4, 6};
int threadCount = 3;
int chunkSize = (int) Math.ceil((double) array.length / threadCount);
CyclicBarrier barrier = new CyclicBarrier(threadCount, () -> {
System.out.println("所有分片排序完成,合并结果");
});
int[][] chunks = new int[threadCount][];
for (int i = 0; i < threadCount; i++) {
int start = i * chunkSize;
int end = Math.min(start + chunkSize, array.length);
chunks[i] = Arrays.copyOfRange(array, start, end);
}
Thread[] threads = new Thread[threadCount];
for (int i = 0; i < threadCount; i++) {
final int[] chunk = chunks[i];
final int threadId = i;
threads[i] = new Thread(() -> {
try {
// 排序阶段
System.out.println("线程 " + threadId + " 排序: " + Arrays.toString(chunk));
Arrays.sort(chunk);
System.out.println("线程 " + threadId + " 排序完成: " + Arrays.toString(chunk));
barrier.await();
// 合并阶段
System.out.println("线程 " + threadId + " 参与合并");
} catch (Exception e) {
e.printStackTrace();
}
});
threads[i].start();
}
for (Thread t : threads) {
t.join();
}
// 合并结果
int[] result = Arrays.stream(chunks)
.flatMapToInt(Arrays::stream)
.sorted()
.toArray();
System.out.println("\n最终排序结果: " + Arrays.toString(result));
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
循环使用
java
public class CyclicBarrierCyclicUse {
public static void main(String[] args) throws InterruptedException {
int partyCount = 3;
int cycleCount = 2;
CyclicBarrier barrier = new CyclicBarrier(partyCount);
for (int i = 0; i < partyCount; i++) {
final int threadId = i;
new Thread(() -> {
try {
for (int cycle = 1; cycle <= cycleCount; cycle++) {
System.out.println("线程 " + threadId + " 完成第 " + cycle + " 轮");
Thread.sleep((long) (Math.random() * 300 + 100));
barrier.await();
System.out.println("线程 " + threadId + " 进入第 " + cycle + " 轮下一阶段");
}
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
方法一览
| 方法 | 说明 |
|---|---|
CyclicBarrier(int parties) | 构造方法,指定参与线程数 |
CyclicBarrier(int parties, Runnable action) | 指定参与线程数和屏障动作 |
await() | 等待所有线程到达屏障 |
await(long timeout, TimeUnit) | 等待指定时间 |
reset() | 重置屏障到初始状态 |
getNumberWaiting() | 获取当前正在等待的线程数 |
isBroken() | 检查屏障是否被破坏 |
注意事项
- BrokenBarrierException:如果一个线程在等待时被中断,或 barrier 被 reset,其他等待的线程会抛出 BrokenBarrierException
- 屏障重置:
reset()会导致正在等待的线程抛出 BrokenBarrierException - 异常处理:等待中的线程如果被中断会抛出 InterruptedException
- 线程数量必须匹配:构造函数中的 party 数量应与实际调用 await() 的线程数匹配
java
// 处理 BrokenBarrierException
CyclicBarrier barrier = new CyclicBarrier(3);
for (int i = 0; i < 3; i++) {
new Thread(() -> {
try {
barrier.await();
} catch (BrokenBarrierException e) {
System.out.println("屏障已损坏");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
2
3
4
5
6
7
8
9
10
11
12
13
14
要点回顾
CyclicBarrier 适用于"多互等"的场景:
- 多线程分阶段执行
- 多人游戏准备开始
- 并行计算合并结果
记住:可重置,可以循环使用。
