Skip to content

Executors 线程池工厂

Executors 是 JDK 1.5 引入的线程池工厂类,提供了一键创建线程池的便捷方法。

但阿里规范说了:不要用。

为什么?看完你就明白了。

四种工厂方法

方法类型corePoolSizemaximumPoolSize队列
newFixedThreadPool固定大小nnLinkedBlockingQueue (无界)
newSingleThreadExecutor单线程11LinkedBlockingQueue (无界)
newCachedThreadPool缓存0Integer.MAX_VALUESynchronousQueue
newScheduledThreadPool调度corePoolSizeInteger.MAX_VALUEDelayedWorkQueue

代码演示

newFixedThreadPool:固定大小

java
public class FixedThreadPoolDemo {

    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(3);

        for (int i = 1; i <= 10; i++) {
            final int taskId = i;
            executor.submit(() -> {
                System.out.println("任务 " + taskId + " 由 " +
                    Thread.currentThread().getName() + " 执行");
                try {
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }

        executor.shutdown();
    }
}

输出示例(注意线程复用):

任务 1 由 pool-1-thread-1 执行
任务 2 由 pool-1-thread-2 执行
任务 3 由 pool-1-thread-3 执行
任务 4 由 pool-1-thread-1 执行   ← 复用线程
任务 5 由 pool-1-thread-2 执行   ← 复用线程
...

newSingleThreadExecutor:单线程

保证所有任务按提交顺序串行执行

java
public class SingleThreadExecutorDemo {

    public static void main(String[] args) {
        ExecutorService executor = Executors.newSingleThreadExecutor();

        for (int i = 1; i <= 5; i++) {
            final int taskId = i;
            executor.submit(() -> {
                System.out.println("任务 " + taskId + " 执行");
            });
        }

        executor.shutdown();
    }
}

newCachedThreadPool:缓存

按需创建线程,线程空闲 60 秒后回收:

java
public class CachedThreadPoolDemo {

    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();

        for (int i = 1; i <= 10; i++) {
            final int taskId = i;
            executor.submit(() -> {
                System.out.println("任务 " + taskId + " 由 " +
                    Thread.currentThread().getName() + " 执行");
            });
        }

        executor.shutdown();
    }
}

newScheduledThreadPool:定时调度

java
public class ScheduledThreadPoolDemo {

    public static void main(String[] args) {
        ScheduledExecutorService scheduler =
            Executors.newScheduledThreadPool(3);

        // 延迟 1 秒后执行一次
        scheduler.schedule(() -> {
            System.out.println("延迟 1 秒执行");
        }, 1, TimeUnit.SECONDS);

        // 延迟 2 秒后开始,之后每 3 秒执行一次(固定速率)
        scheduler.scheduleAtFixedRate(() -> {
            System.out.println("固定速率任务执行: " + System.currentTimeMillis());
        }, 2, 3, TimeUnit.SECONDS);

        // 延迟 3 秒后开始,之后每次等上次任务完成后延迟 2 秒执行
        scheduler.scheduleWithFixedDelay(() -> {
            System.out.println("固定延迟任务执行: " + System.currentTimeMillis());
            try { Thread.sleep(1); } catch (InterruptedException e) {}
        }, 3, 2, TimeUnit.SECONDS);

        // 关闭调度器
        scheduler.shutdown();
    }
}

阿里巴巴规范的警告

为什么不能直接用 Executors?

问题一:无界队列导致 OOM

java
// newFixedThreadPool 内部实现
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(
        nThreads,
        nThreads,
        0L, TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue<>()  // 无界队列!
    );
}

如果任务提交速度超过处理速度,队列会无限增长,最终 OOM。

问题二:最大线程数无界

java
// newCachedThreadPool 内部实现
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(
        0,                           // core = 0
        Integer.MAX_VALUE,            // max = 无界!
        60L, TimeUnit.SECONDS,
        new SynchronousQueue<>()
    );
}

如果任务都是阻塞的(等待 I/O),线程会不断创建,最终耗尽系统资源。

推荐做法

java
public class CustomThreadPoolDemo {

    public static void main(String[] args) {
        // 推荐:显式创建线程池
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            10,                      // 核心线程数
            20,                      // 最大线程数
            60L, TimeUnit.SECONDS,   // 空闲存活时间
            new LinkedBlockingQueue<>(100), // 有界队列
            new ThreadFactory() {
                private AtomicInteger count = new AtomicInteger(1);
                @Override
                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r);
                    t.setName("MyPool-Worker-" + count.getAndIncrement());
                    t.setDaemon(false);
                    return t;
                }
            },
            new ThreadPoolExecutor.AbortPolicy()  // 拒绝策略
        );

        // 提交任务
        for (int i = 1; i <= 10; i++) {
            final int taskId = i;
            executor.submit(() -> {
                System.out.println("任务 " + taskId + " 执行");
            });
        }

        executor.shutdown();
    }
}

实际应用:批量处理

java
public class BatchProcessingDemo {

    private static final int BATCH_SIZE = 100;
    private static final int THREAD_COUNT = 4;

    public static void main(String[] args) throws InterruptedException {
        ExecutorService executor = new ThreadPoolExecutor(
            THREAD_COUNT,
            THREAD_COUNT,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(BATCH_SIZE * 2),
            new ThreadPoolExecutor.CallerRunsPolicy()
        );

        // 模拟批量处理
        List<Integer> items = IntStream.range(1, 501).boxed()
            .collect(Collectors.toList());
        List<List<Integer>> batches = Lists.partition(items, BATCH_SIZE);

        CountDownLatch latch = new CountDownLatch(batches.size());

        for (List<Integer> batch : batches) {
            executor.submit(() -> {
                try {
                    processBatch(batch);
                } finally {
                    latch.countDown();
                }
            });
        }

        latch.await();
        executor.shutdown();
        System.out.println("所有批次处理完成");
    }

    private static void processBatch(List<Integer> batch) {
        System.out.println("处理批次: " + batch);
    }
}

总结对比

工厂方法适用场景风险
newFixedThreadPool固定并发数队列无界可能 OOM
newSingleThreadExecutor顺序执行队列无界可能 OOM
newCachedThreadPool短期异步任务线程数无界
newScheduledThreadPool定时/周期任务线程数无界

结论:除非你能确保任务量可控,否则不要用 Executors。

基于 VitePress 构建