Skip to content

Stream 并行

单核 CPU 的时代,并行是服务器程序员的专利。现在手机都是 8 核了,你的代码用上多核了吗?

Stream 并行,让这件事变得简单。

串行 vs 并行

java
// 串行流(默认)
list.stream()
    .filter(...)
    .collect(...);

// 并行流
list.parallelStream()
    // 或者
list.stream().parallel()
    .filter(...)
    .collect(...);

一行代码,从单核变成多核。原理是把数据分成多份,用 Fork/Join 框架并行处理,最后合并结果。

java
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);

long start = System.currentTimeMillis();
long serialSum = numbers.stream()
    .mapToLong(n -> {
        try { Thread.sleep(1); } catch (Exception e) {} // 模拟耗时
        return n;
    })
    .sum();
System.out.println("串行耗时: " + (System.currentTimeMillis() - start) + "ms");

start = System.currentTimeMillis();
long parallelSum = numbers.parallelStream()
    .mapToLong(n -> {
        try { Thread.sleep(1); } catch (Exception e) {}
        return n;
    })
    .sum();
System.out.println("并行耗时: " + (System.currentTimeMillis() - start) + "ms");

并行流的适用场景

并行不是万能的。数据量不够大、操作不够耗时,开通并行反而因为线程调度开销更慢。

适合并行的场景

java
// 场景一:大数据量处理
IntStream.range(0, 10_000_000)
    .parallel()
    .filter(n -> n % 2 == 0)
    .count();

// 场景二:IO 密集型任务(最典型的场景)
List<String> urls = Arrays.asList(
    "https://api.example.com/1",
    "https://api.example.com/2",
    // ... 1000 个 URL
);

List<String> results = urls.parallelStream()
    .map(this::fetchData)
    .collect(Collectors.toList());

private String fetchData(String url) {
    // 网络请求:并行效果显著
    return httpClient.get(url);
}

// 场景三:CPU 密集但可分割的任务
List<Matrix> matrices = generateMatrices();
List<Matrix> results = matrices.parallelStream()
    .map(Matrix::multiply)  // 矩阵乘法
    .collect(Collectors.toList());

不适合并行的场景

java
// 场景一:小数据量
Arrays.asList(1, 2, 3).parallelStream()
    .map(x -> x * 2)  // 开线程池的开销比计算本身还大
    .collect(Collectors.toList());

// 场景二:有状态的操作
List<Integer> result = new ArrayList<>();
numbers.parallelStream()
    .forEach(n -> {
        result.add(n);  // ❌ 线程不安全
    });

// 场景三:顺序敏感的操作(除非用 forEachOrdered)
List<String> data = Arrays.asList("a", "b", "c", "d", "e");

// ❌ forEach 不保证顺序
data.parallelStream().forEach(System.out::println);

// ✅ forEachOrdered 保证顺序
data.parallelStream().forEachOrdered(System.out::println);

有状态 vs 无状态

并行 Stream 要求操作是无状态的。

java
// ✅ 无状态:每个元素的处理不依赖其他元素
list.parallelStream()
    .map(String::toUpperCase)
    .filter(s -> s.length() > 3)
    .collect(Collectors.toList());

// ❌ 有状态:结果依赖之前处理过的元素
list.parallelStream()
    .map(s -> {
        if (s.startsWith("A")) return "processed";
        return s;  // 依赖之前的判断结果
    })
    .collect(Collectors.toList());

安全的收集方式

java
// ❌ 不安全:共享可变变量
List<Integer> unsafe = Collections.synchronizedList(new ArrayList<>());
numbers.parallelStream()
    .forEach(n -> unsafe.add(n));  // 同步开销大

// ✅ 安全:用 collect 代替 forEach
List<Integer> safe = numbers.parallelStream()
    .collect(Collectors.toList());

// ✅ 局部累积(更高效)
Map<String, Integer> wordCount = words.parallelStream()
    .collect(Collectors.toMap(
        Function.identity(),
        s -> 1,
        Integer::sum  // 合并冲突的值
    ));

有序性处理

并行流可能不保持元素顺序。取决于你的操作:

java
List<String> data = Arrays.asList("a", "b", "c", "d", "e");

// forEach:不保证顺序
data.parallelStream().forEach(System.out::print);
// 输出可能是:eacbd(每次不同)

// forEachOrdered:保持顺序
data.parallelStream().forEachOrdered(System.out::print);
// 输出固定:abcde

// collect:终端操作通常保持顺序
List<String> collected = data.parallelStream()
    .collect(Collectors.toList());  // 保持顺序

// 减少操作(如 reduce)可能不保持顺序
Optional<String> reduced = data.parallelStream()
    .reduce((a, b) -> a + b);  // 不保证顺序

线程安全集合

并行处理时,收集结果要选对容器:

java
// ❌ 普通 ArrayList:不是线程安全的
List<Integer> unsafe = new ArrayList<>();
numbers.parallelStream().forEach(n -> unsafe.add(n));  // 可能丢数据

// ✅ 方案一:收集到线程安全的集合
List<Integer> safe1 = numbers.parallelStream()
    .collect(Collectors.toList());  // 内部处理好了

// ✅ 方案二:用 synchronizedList
List<Integer> safe2 = Collections.synchronizedList(new ArrayList<>());
numbers.parallelStream().forEach(n -> safe2.add(n));  // 安全

// ✅ 方案三:用 ConcurrentHashMap 累积
Map<String, Integer> counts = new ConcurrentHashMap<>();
words.parallelStream().forEach(w -> 
    counts.merge(w, 1, Integer::sum));

// ✅ 方案四:用 groupingByConcurrent(JDK 9+)
Map<String, Long> grouped = words.parallelStream()
    .collect(Collectors.groupingByConcurrent(Function.identity(), counting()));

并行度控制

默认并行度 = CPU 核心数。你也可以自定义:

java
// 设置全局默认并行度
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "8");

// 用自定义 ForkJoinPool(更可控)
ForkJoinPool customPool = new ForkJoinPool(4);

List<String> result = customPool.submit(() ->
    list.parallelStream()
        .map(this::processItem)
        .collect(Collectors.toList())
).join();

customPool.shutdown();

// 适用场景:Web 服务中不希望并行任务影响主线程池

性能对比

用数据说话:

java
import java.util.*;
import java.util.concurrent.*;

public class ParallelBenchmark {
    
    public static void main(String[] args) {
        // 生成 1000 万个随机数
        Random random = new Random(42);
        List<Integer> data = random.ints(10_000_000)
            .boxed()
            .toList();
        
        // 串行
        System.out.println("串行过滤+求和:");
        measure(() -> data.stream()
            .filter(n -> n > 0)
            .mapToLong(n -> n)
            .sum());
        
        // 并行
        System.out.println("并行过滤+求和:");
        measure(() -> data.parallelStream()
            .filter(n -> n > 0)
            .mapToLong(n -> n)
            .sum());
    }
    
    private static void measure(LongSupplier action) {
        long start = System.currentTimeMillis();
        long result = action.getAsLong();
        long duration = System.currentTimeMillis() - start;
        System.out.println("结果: " + result + ", 耗时: " + duration + "ms");
    }
}

典型结果(8 核 CPU,处理 1000 万个整数):

方式耗时加速比
串行~800ms1x
并行~120ms~6.5x

注意事项

  1. 别把并行当默认:串行流在小数据量时更快
  2. 注意有序性:如果需要顺序,用 forEachOrdered 或检查终端操作是否保持顺序
  3. 避免有状态操作:无状态操作才能安全并行
  4. 用对收集器toList()groupingBytoMap 等内置收集器都是线程安全的
  5. IO 密集型最受益:网络请求、文件读写等场景并行效果最明显

并行 Stream 是提升性能的利器,但要用对场景。数据量小、计算简单、有状态操作——这些场景下串行反而更好。

基于 VitePress 构建