线程池拒绝策略
当线程池无法接受新任务时,会发生什么?
拒绝策略说了算。
什么时候触发
线程池拒绝新任务只有一种情况:线程池饱和。
什么算饱和?
条件 1:队列满了
条件 2:线程数达到最大值
同时满足 → 触发拒绝策略四种内置策略
| 策略 | 行为 | 后果 |
|---|---|---|
| AbortPolicy | 抛异常 | 任务丢失,调用方知道 |
| CallerRunsPolicy | 调用线程执行 | 任务不丢,调用方变慢 |
| DiscardPolicy | 直接丢弃 | 任务静默丢失 |
| DiscardOldestPolicy | 丢弃最老的 | 丢弃队列头的任务 |
代码演示
场景设置
为了触发拒绝策略,我们用一个小队列和少线程:
java
// 队列容量 = 2,线程数 = 2
ThreadPoolExecutor executor = new ThreadPoolExecutor(
1, 1, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(2) // 只能存 2 个任务
);AbortPolicy(默认)
抛异常,任务丢失:
java
public class AbortPolicyDemo {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
1, 1, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(2),
new ThreadPoolExecutor.AbortPolicy()
);
for (int i = 1; i <= 5; i++) {
final int taskId = i;
try {
executor.submit(() -> {
System.out.println("任务 " + taskId + " 执行");
Thread.sleep(500);
});
} catch (RejectedExecutionException e) {
System.out.println("任务 " + taskId + " 被拒绝");
}
}
executor.shutdown();
}
}输出:
任务 1 执行
任务 2 执行
任务 3 执行(入队)
任务 4 执行(入队)
任务 5 被拒绝CallerRunsPolicy
调用线程自己执行,任务不丢:
java
public class CallerRunsPolicyDemo {
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
1, 1, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(2),
new ThreadPoolExecutor.CallerRunsPolicy()
);
for (int i = 1; i <= 5; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("任务 " + taskId + " 执行 by " +
Thread.currentThread().getName());
Thread.sleep(500);
});
}
executor.shutdown();
executor.awaitTermination(10, TimeUnit.SECONDS);
}
}输出:
任务 1 执行 by pool-1-thread-1
任务 2 执行 by pool-1-thread-1
任务 3 执行 by pool-1-thread-1(CallerRuns)
任务 4 执行 by pool-1-thread-1
任务 5 执行 by pool-1-thread-1(CallerRuns)DiscardPolicy
静默丢弃,不通知:
java
public class DiscardPolicyDemo {
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
1, 1, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(2),
new ThreadPoolExecutor.DiscardPolicy()
);
List<Integer> completed = Collections.synchronizedList(new ArrayList<>());
for (int i = 1; i <= 5; i++) {
final int taskId = i;
executor.submit(() -> {
completed.add(taskId);
System.out.println("任务 " + taskId + " 完成");
Thread.sleep(500);
});
}
Thread.sleep(4000);
System.out.println("完成的任务: " + completed); // 可能只有 3-4 个
executor.shutdown();
}
}DiscardOldestPolicy
丢弃最老的,给新任务让位:
java
public class DiscardOldestPolicyDemo {
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
1, 1, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(2),
new ThreadPoolExecutor.DiscardOldestPolicy()
);
for (int i = 1; i <= 5; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("任务 " + taskId + " 执行");
Thread.sleep(500);
});
}
executor.shutdown();
}
}自定义拒绝策略
根据业务需求自定义处理逻辑:
java
public class CustomRejectPolicy {
private static final Logger log = Logger.getLogger(CustomRejectPolicy.class);
public static RejectedExecutionHandler createLoggingHandler() {
return (r, executor) -> {
log.warn("任务被拒绝,队列已满");
// 可以持久化到数据库
// 可以发送到消息队列
// 可以返回给调用方
throw new RejectedExecutionException("线程池饱和");
};
}
public static RejectedExecutionHandler createRetryHandler() {
return (r, executor) -> {
// 尝试同步执行一次
if (!executor.isShutdown()) {
r.run();
log.info("拒绝后由调用方同步执行");
}
};
}
public static RejectedExecutionHandler createPersistentHandler() {
Queue<Runnable> persistentQueue = new ConcurrentLinkedQueue<>();
return (r, executor) -> {
log.info("任务被拒绝,持久化存储");
persistentQueue.offer(r);
log.info("待重试任务数: " + persistentQueue.size());
// 后续可以用定时任务重试这些任务
};
}
}实际应用:任务持久化
java
public class PersistentTaskExecutor {
private final ThreadPoolExecutor executor;
private final Queue<Runnable> failedTasks = new ConcurrentLinkedQueue<>();
public PersistentTaskExecutor() {
this.executor = new ThreadPoolExecutor(
2, 4, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(100),
(r, ex) -> {
// 拒绝时持久化
failedTasks.offer(r);
System.out.println("任务持久化,当前待重试: " + failedTasks.size());
}
);
}
public void submit(Runnable task) {
executor.execute(task);
}
public void retryFailed() {
Runnable task;
while ((task = failedTasks.poll()) != null) {
executor.execute(task);
}
}
public void shutdown() {
executor.shutdown();
}
}选择建议
业务重要吗?
├── 是 ──► 任务不能丢
│ ├── 期望失败? ──► CallerRunsPolicy(调用方承担)
│ └── 不期望失败? ──► AbortPolicy + 监控告警
│
└── 否 ──► 任务可以丢
├── 需要反馈? ──► CallerRunsPolicy
└── 不需要反馈? ──► DiscardPolicy / DiscardOldestPolicy注意事项
- CallerRunsPolicy 可能导致调用线程阻塞:调用方执行任务时无法继续提交
- DiscardOldestPolicy 会丢弃队列头的任务:可能是重要的待处理任务
- 自定义策略要考虑线程安全:持久化等操作需要线程安全
- 最佳实践:核心业务用 AbortPolicy + 监控告警 + 补偿机制
要点回顾
拒绝策略是线程池的最后一道防线。选择原则:
- 数据重要:AbortPolicy + 补偿
- 需要限流:CallerRunsPolicy
- 任务可丢:DiscardPolicy
