Skip to content

线程池拒绝策略

当线程池无法接受新任务时,会发生什么?

拒绝策略说了算。

什么时候触发

线程池拒绝新任务只有一种情况:线程池饱和

什么算饱和?

条件 1:队列满了
条件 2:线程数达到最大值
同时满足 → 触发拒绝策略

四种内置策略

策略行为后果
AbortPolicy抛异常任务丢失,调用方知道
CallerRunsPolicy调用线程执行任务不丢,调用方变慢
DiscardPolicy直接丢弃任务静默丢失
DiscardOldestPolicy丢弃最老的丢弃队列头的任务

代码演示

场景设置

为了触发拒绝策略,我们用一个小队列和少线程:

java
// 队列容量 = 2,线程数 = 2
ThreadPoolExecutor executor = new ThreadPoolExecutor(
    1, 1, 60L, TimeUnit.SECONDS,
    new LinkedBlockingQueue<>(2)  // 只能存 2 个任务
);

AbortPolicy(默认)

抛异常,任务丢失:

java
public class AbortPolicyDemo {

    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            1, 1, 60L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(2),
            new ThreadPoolExecutor.AbortPolicy()
        );

        for (int i = 1; i <= 5; i++) {
            final int taskId = i;
            try {
                executor.submit(() -> {
                    System.out.println("任务 " + taskId + " 执行");
                    Thread.sleep(500);
                });
            } catch (RejectedExecutionException e) {
                System.out.println("任务 " + taskId + " 被拒绝");
            }
        }

        executor.shutdown();
    }
}

输出:

任务 1 执行
任务 2 执行
任务 3 执行(入队)
任务 4 执行(入队)
任务 5 被拒绝

CallerRunsPolicy

调用线程自己执行,任务不丢:

java
public class CallerRunsPolicyDemo {

    public static void main(String[] args) throws InterruptedException {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            1, 1, 60L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(2),
            new ThreadPoolExecutor.CallerRunsPolicy()
        );

        for (int i = 1; i <= 5; i++) {
            final int taskId = i;
            executor.submit(() -> {
                System.out.println("任务 " + taskId + " 执行 by " +
                    Thread.currentThread().getName());
                Thread.sleep(500);
            });
        }

        executor.shutdown();
        executor.awaitTermination(10, TimeUnit.SECONDS);
    }
}

输出:

任务 1 执行 by pool-1-thread-1
任务 2 执行 by pool-1-thread-1
任务 3 执行 by pool-1-thread-1(CallerRuns)
任务 4 执行 by pool-1-thread-1
任务 5 执行 by pool-1-thread-1(CallerRuns)

DiscardPolicy

静默丢弃,不通知:

java
public class DiscardPolicyDemo {

    public static void main(String[] args) throws InterruptedException {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            1, 1, 60L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(2),
            new ThreadPoolExecutor.DiscardPolicy()
        );

        List<Integer> completed = Collections.synchronizedList(new ArrayList<>());

        for (int i = 1; i <= 5; i++) {
            final int taskId = i;
            executor.submit(() -> {
                completed.add(taskId);
                System.out.println("任务 " + taskId + " 完成");
                Thread.sleep(500);
            });
        }

        Thread.sleep(4000);
        System.out.println("完成的任务: " + completed);  // 可能只有 3-4 个
        executor.shutdown();
    }
}

DiscardOldestPolicy

丢弃最老的,给新任务让位:

java
public class DiscardOldestPolicyDemo {

    public static void main(String[] args) throws InterruptedException {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            1, 1, 60L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(2),
            new ThreadPoolExecutor.DiscardOldestPolicy()
        );

        for (int i = 1; i <= 5; i++) {
            final int taskId = i;
            executor.submit(() -> {
                System.out.println("任务 " + taskId + " 执行");
                Thread.sleep(500);
            });
        }

        executor.shutdown();
    }
}

自定义拒绝策略

根据业务需求自定义处理逻辑:

java
public class CustomRejectPolicy {

    private static final Logger log = Logger.getLogger(CustomRejectPolicy.class);

    public static RejectedExecutionHandler createLoggingHandler() {
        return (r, executor) -> {
            log.warn("任务被拒绝,队列已满");
            // 可以持久化到数据库
            // 可以发送到消息队列
            // 可以返回给调用方
            throw new RejectedExecutionException("线程池饱和");
        };
    }

    public static RejectedExecutionHandler createRetryHandler() {
        return (r, executor) -> {
            // 尝试同步执行一次
            if (!executor.isShutdown()) {
                r.run();
                log.info("拒绝后由调用方同步执行");
            }
        };
    }

    public static RejectedExecutionHandler createPersistentHandler() {
        Queue<Runnable> persistentQueue = new ConcurrentLinkedQueue<>();

        return (r, executor) -> {
            log.info("任务被拒绝,持久化存储");
            persistentQueue.offer(r);
            log.info("待重试任务数: " + persistentQueue.size());
            // 后续可以用定时任务重试这些任务
        };
    }
}

实际应用:任务持久化

java
public class PersistentTaskExecutor {

    private final ThreadPoolExecutor executor;
    private final Queue<Runnable> failedTasks = new ConcurrentLinkedQueue<>();

    public PersistentTaskExecutor() {
        this.executor = new ThreadPoolExecutor(
            2, 4, 60L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(100),
            (r, ex) -> {
                // 拒绝时持久化
                failedTasks.offer(r);
                System.out.println("任务持久化,当前待重试: " + failedTasks.size());
            }
        );
    }

    public void submit(Runnable task) {
        executor.execute(task);
    }

    public void retryFailed() {
        Runnable task;
        while ((task = failedTasks.poll()) != null) {
            executor.execute(task);
        }
    }

    public void shutdown() {
        executor.shutdown();
    }
}

选择建议

业务重要吗?
  ├── 是 ──► 任务不能丢
  │         ├── 期望失败? ──► CallerRunsPolicy(调用方承担)
  │         └── 不期望失败? ──► AbortPolicy + 监控告警

  └── 否 ──► 任务可以丢
            ├── 需要反馈? ──► CallerRunsPolicy
            └── 不需要反馈? ──► DiscardPolicy / DiscardOldestPolicy

注意事项

  1. CallerRunsPolicy 可能导致调用线程阻塞:调用方执行任务时无法继续提交
  2. DiscardOldestPolicy 会丢弃队列头的任务:可能是重要的待处理任务
  3. 自定义策略要考虑线程安全:持久化等操作需要线程安全
  4. 最佳实践:核心业务用 AbortPolicy + 监控告警 + 补偿机制

要点回顾

拒绝策略是线程池的最后一道防线。选择原则:

  • 数据重要:AbortPolicy + 补偿
  • 需要限流:CallerRunsPolicy
  • 任务可丢:DiscardPolicy

基于 VitePress 构建