线程池工作原理
"一个线程池是怎么工作的?任务提交后经历了什么?"
要理解线程池,得从源码层面看它的执行流程。
核心组件
ThreadPoolExecutor 内部结构:
┌─────────────────────────────────────────────────────┐
│ ThreadPoolExecutor │
│ ┌───────────────────────────────────────────────┐ │
│ │ ctl (AtomicInteger) │ │
│ │ 高3位:线程池状态 | 低29位:线程数量 │ │
│ └───────────────────────────────────────────────┘ │
│ ┌───────────────────────────────────────────────┐ │
│ │ workQueue: BlockingQueue<Runnable> │ │
│ │ 存放待执行的任务 │ │
│ └───────────────────────────────────────────────┘ │
│ ┌───────────────────────────────────────────────┐ │
│ │ workers: HashSet<Worker> │ │
│ │ 实际执行任务的 Worker 线程集合 │ │
│ └───────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────┘Worker 线程结构
Worker 继承了 AbstractQueuedSynchronizer,实现了 Runnable:
java
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
final Thread thread; // 执行任务的线程
Runnable firstTask; // 第一个任务(可能为 null)
long completedTasks; // 该 worker 完成的任务数
}任务提交
java
public class TaskSubmissionDemo {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 4, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10)
);
// 提交一个任务
executor.execute(() -> {
System.out.println("任务执行中: " + Thread.currentThread().getName());
});
// 提交有返回值的任务
Future<String> future = executor.submit(() -> {
Thread.sleep(500);
return "任务结果";
});
try {
String result = future.get();
System.out.println("任务结果: " + result);
} catch (Exception e) {
e.printStackTrace();
}
executor.shutdown();
}
}execute() 执行流程
任务提交后,经历了什么?
java
public void execute(Runnable command) {
if (command == null) throw new NullPointerException();
int c = ctl.get();
// 步骤1: 核心线程未满,创建核心线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 步骤2: 尝试加入任务队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 再次检查状态,可能其他线程关闭了线程池
if (!isRunning(recheck) && remove(command))
reject(command);
// 如果工作线程为 0,需要创建线程处理队列任务
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
return;
}
// 步骤3: 队列满了,尝试创建临时线程
else if (!addWorker(command, false))
// 步骤4: 都失败了,执行拒绝策略
reject(command);
}Worker 执行循环
Worker 线程启动后,会不停地从队列获取任务并执行:
java
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
try {
// 循环获取任务并执行
while (task != null || (task = getTask()) != null) {
// 获取锁,确保安全
w.lock();
try {
// 执行前钩子
beforeExecute(wt, task);
try {
// 执行任务
task.run();
// 执行后钩子
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
} finally {
// 处理完成,清理 worker
processWorkerExit(w, completedAbruptly);
}
}获取任务 getTask()
java
private Runnable getTask() {
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 判断是否需要超时回收
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
if (decrementWorkerCount() == 0)
return null;
continue;
}
// 核心线程不超时,永久阻塞等待
// 非核心线程超时等待
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
}
}线程池状态
RUNNING ──► SHUTDOWN ──► STOP ──► TIDYING ──► TERMINATED| 状态 | 值 | 说明 |
|---|---|---|
| RUNNING | 111 | 接受新任务,处理队列任务 |
| SHUTDOWN | 000 | 不接受新任务,处理队列任务 |
| STOP | 001 | 不接受新任务,不处理队列,中断正在执行的任务 |
| TIDYING | 010 | 所有任务终止,workerCount 为 0 |
| TERMINATED | 011 | terminate() 执行完成 |
java
public class ThreadPoolStateDemo {
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 4, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10)
);
System.out.println("初始状态: " + executor.isShutdown()); // false
executor.shutdown();
System.out.println("shutdown() 后: " + executor.isShutdown()); // true
System.out.println("isTerminating(): " + executor.isTerminating()); // true
executor.awaitTermination(1, TimeUnit.SECONDS);
System.out.println("isTerminated(): " + executor.isTerminated());
}
}shutdown() vs shutdownNow()
| 方法 | 行为 |
|---|---|
| shutdown() | 不再接受新任务,等待队列任务执行完毕 |
| shutdownNow() | 不再接受新任务,中断正在执行的任务,返回队列中的任务 |
java
public class ShutdownNowDemo {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 4, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10)
);
executor.submit(() -> {
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
System.out.println("线程被中断");
}
});
List<Runnable> rejected = executor.shutdownNow();
System.out.println("被拒绝的任务数: " + rejected.size());
}
}注意事项
- 核心线程默认不会被回收:除非设置
allowCoreThreadTimeOut(true) - 任务执行顺序不保证:大多数队列是 FIFO,但不保证
- shutdown() vs shutdownNow():
- shutdown():不再接受新任务,等待队列任务执行完毕
- shutdownNow():不再接受新任务,中断正在执行的任务,返回队列中的任务
- getTask() 的阻塞获取:非核心线程在超时后会被回收
要点回顾
线程池的核心是一个 ctl 变量和 workers 集合:
- ctl:高 3 位存状态,低 29 位存线程数
- workers:实际执行任务的 Worker 线程集合
- workQueue:存放待执行任务的阻塞队列
Worker 线程在 runWorker() 中循环调用 getTask(),不断从队列获取任务并执行,直到队列为空且线程需要回收。
