Stream 并行
单核 CPU 的时代,并行是服务器程序员的专利。现在手机都是 8 核了,你的代码用上多核了吗?
Stream 并行,让这件事变得简单。
串行 vs 并行
java
// 串行流(默认)
list.stream()
.filter(...)
.collect(...);
// 并行流
list.parallelStream()
// 或者
list.stream().parallel()
.filter(...)
.collect(...);一行代码,从单核变成多核。原理是把数据分成多份,用 Fork/Join 框架并行处理,最后合并结果。
java
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
long start = System.currentTimeMillis();
long serialSum = numbers.stream()
.mapToLong(n -> {
try { Thread.sleep(1); } catch (Exception e) {} // 模拟耗时
return n;
})
.sum();
System.out.println("串行耗时: " + (System.currentTimeMillis() - start) + "ms");
start = System.currentTimeMillis();
long parallelSum = numbers.parallelStream()
.mapToLong(n -> {
try { Thread.sleep(1); } catch (Exception e) {}
return n;
})
.sum();
System.out.println("并行耗时: " + (System.currentTimeMillis() - start) + "ms");并行流的适用场景
并行不是万能的。数据量不够大、操作不够耗时,开通并行反而因为线程调度开销更慢。
适合并行的场景
java
// 场景一:大数据量处理
IntStream.range(0, 10_000_000)
.parallel()
.filter(n -> n % 2 == 0)
.count();
// 场景二:IO 密集型任务(最典型的场景)
List<String> urls = Arrays.asList(
"https://api.example.com/1",
"https://api.example.com/2",
// ... 1000 个 URL
);
List<String> results = urls.parallelStream()
.map(this::fetchData)
.collect(Collectors.toList());
private String fetchData(String url) {
// 网络请求:并行效果显著
return httpClient.get(url);
}
// 场景三:CPU 密集但可分割的任务
List<Matrix> matrices = generateMatrices();
List<Matrix> results = matrices.parallelStream()
.map(Matrix::multiply) // 矩阵乘法
.collect(Collectors.toList());不适合并行的场景
java
// 场景一:小数据量
Arrays.asList(1, 2, 3).parallelStream()
.map(x -> x * 2) // 开线程池的开销比计算本身还大
.collect(Collectors.toList());
// 场景二:有状态的操作
List<Integer> result = new ArrayList<>();
numbers.parallelStream()
.forEach(n -> {
result.add(n); // ❌ 线程不安全
});
// 场景三:顺序敏感的操作(除非用 forEachOrdered)
List<String> data = Arrays.asList("a", "b", "c", "d", "e");
// ❌ forEach 不保证顺序
data.parallelStream().forEach(System.out::println);
// ✅ forEachOrdered 保证顺序
data.parallelStream().forEachOrdered(System.out::println);有状态 vs 无状态
并行 Stream 要求操作是无状态的。
java
// ✅ 无状态:每个元素的处理不依赖其他元素
list.parallelStream()
.map(String::toUpperCase)
.filter(s -> s.length() > 3)
.collect(Collectors.toList());
// ❌ 有状态:结果依赖之前处理过的元素
list.parallelStream()
.map(s -> {
if (s.startsWith("A")) return "processed";
return s; // 依赖之前的判断结果
})
.collect(Collectors.toList());安全的收集方式
java
// ❌ 不安全:共享可变变量
List<Integer> unsafe = Collections.synchronizedList(new ArrayList<>());
numbers.parallelStream()
.forEach(n -> unsafe.add(n)); // 同步开销大
// ✅ 安全:用 collect 代替 forEach
List<Integer> safe = numbers.parallelStream()
.collect(Collectors.toList());
// ✅ 局部累积(更高效)
Map<String, Integer> wordCount = words.parallelStream()
.collect(Collectors.toMap(
Function.identity(),
s -> 1,
Integer::sum // 合并冲突的值
));有序性处理
并行流可能不保持元素顺序。取决于你的操作:
java
List<String> data = Arrays.asList("a", "b", "c", "d", "e");
// forEach:不保证顺序
data.parallelStream().forEach(System.out::print);
// 输出可能是:eacbd(每次不同)
// forEachOrdered:保持顺序
data.parallelStream().forEachOrdered(System.out::print);
// 输出固定:abcde
// collect:终端操作通常保持顺序
List<String> collected = data.parallelStream()
.collect(Collectors.toList()); // 保持顺序
// 减少操作(如 reduce)可能不保持顺序
Optional<String> reduced = data.parallelStream()
.reduce((a, b) -> a + b); // 不保证顺序线程安全集合
并行处理时,收集结果要选对容器:
java
// ❌ 普通 ArrayList:不是线程安全的
List<Integer> unsafe = new ArrayList<>();
numbers.parallelStream().forEach(n -> unsafe.add(n)); // 可能丢数据
// ✅ 方案一:收集到线程安全的集合
List<Integer> safe1 = numbers.parallelStream()
.collect(Collectors.toList()); // 内部处理好了
// ✅ 方案二:用 synchronizedList
List<Integer> safe2 = Collections.synchronizedList(new ArrayList<>());
numbers.parallelStream().forEach(n -> safe2.add(n)); // 安全
// ✅ 方案三:用 ConcurrentHashMap 累积
Map<String, Integer> counts = new ConcurrentHashMap<>();
words.parallelStream().forEach(w ->
counts.merge(w, 1, Integer::sum));
// ✅ 方案四:用 groupingByConcurrent(JDK 9+)
Map<String, Long> grouped = words.parallelStream()
.collect(Collectors.groupingByConcurrent(Function.identity(), counting()));并行度控制
默认并行度 = CPU 核心数。你也可以自定义:
java
// 设置全局默认并行度
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "8");
// 用自定义 ForkJoinPool(更可控)
ForkJoinPool customPool = new ForkJoinPool(4);
List<String> result = customPool.submit(() ->
list.parallelStream()
.map(this::processItem)
.collect(Collectors.toList())
).join();
customPool.shutdown();
// 适用场景:Web 服务中不希望并行任务影响主线程池性能对比
用数据说话:
java
import java.util.*;
import java.util.concurrent.*;
public class ParallelBenchmark {
public static void main(String[] args) {
// 生成 1000 万个随机数
Random random = new Random(42);
List<Integer> data = random.ints(10_000_000)
.boxed()
.toList();
// 串行
System.out.println("串行过滤+求和:");
measure(() -> data.stream()
.filter(n -> n > 0)
.mapToLong(n -> n)
.sum());
// 并行
System.out.println("并行过滤+求和:");
measure(() -> data.parallelStream()
.filter(n -> n > 0)
.mapToLong(n -> n)
.sum());
}
private static void measure(LongSupplier action) {
long start = System.currentTimeMillis();
long result = action.getAsLong();
long duration = System.currentTimeMillis() - start;
System.out.println("结果: " + result + ", 耗时: " + duration + "ms");
}
}典型结果(8 核 CPU,处理 1000 万个整数):
| 方式 | 耗时 | 加速比 |
|---|---|---|
| 串行 | ~800ms | 1x |
| 并行 | ~120ms | ~6.5x |
注意事项
- 别把并行当默认:串行流在小数据量时更快
- 注意有序性:如果需要顺序,用
forEachOrdered或检查终端操作是否保持顺序 - 避免有状态操作:无状态操作才能安全并行
- 用对收集器:
toList()、groupingBy、toMap等内置收集器都是线程安全的 - IO 密集型最受益:网络请求、文件读写等场景并行效果最明显
并行 Stream 是提升性能的利器,但要用对场景。数据量小、计算简单、有状态操作——这些场景下串行反而更好。
