Skip to content

线程池工作原理

"一个线程池是怎么工作的?任务提交后经历了什么?"

要理解线程池,得从源码层面看它的执行流程。

核心组件

ThreadPoolExecutor 内部结构:
┌─────────────────────────────────────────────────────┐
│  ThreadPoolExecutor                                  │
│  ┌───────────────────────────────────────────────┐  │
│  │ ctl (AtomicInteger)                           │  │
│  │ 高3位:线程池状态 | 低29位:线程数量              │  │
│  └───────────────────────────────────────────────┘  │
│  ┌───────────────────────────────────────────────┐  │
│  │ workQueue: BlockingQueue<Runnable>             │  │
│  │ 存放待执行的任务                                 │  │
│  └───────────────────────────────────────────────┘  │
│  ┌───────────────────────────────────────────────┐  │
│  │ workers: HashSet<Worker>                     │  │
│  │ 实际执行任务的 Worker 线程集合                   │  │
│  └───────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────┘

Worker 线程结构

Worker 继承了 AbstractQueuedSynchronizer,实现了 Runnable:

java
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    final Thread thread;           // 执行任务的线程
    Runnable firstTask;            // 第一个任务(可能为 null)
    long completedTasks;           // 该 worker 完成的任务数
}

任务提交

java
public class TaskSubmissionDemo {

    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            2, 4, 60L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(10)
        );

        // 提交一个任务
        executor.execute(() -> {
            System.out.println("任务执行中: " + Thread.currentThread().getName());
        });

        // 提交有返回值的任务
        Future<String> future = executor.submit(() -> {
            Thread.sleep(500);
            return "任务结果";
        });

        try {
            String result = future.get();
            System.out.println("任务结果: " + result);
        } catch (Exception e) {
            e.printStackTrace();
        }

        executor.shutdown();
    }
}

execute() 执行流程

任务提交后,经历了什么?

java
public void execute(Runnable command) {
    if (command == null) throw new NullPointerException();

    int c = ctl.get();

    // 步骤1: 核心线程未满,创建核心线程
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }

    // 步骤2: 尝试加入任务队列
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 再次检查状态,可能其他线程关闭了线程池
        if (!isRunning(recheck) && remove(command))
            reject(command);
        // 如果工作线程为 0,需要创建线程处理队列任务
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
        return;
    }

    // 步骤3: 队列满了,尝试创建临时线程
    else if (!addWorker(command, false))
        // 步骤4: 都失败了,执行拒绝策略
        reject(command);
}

Worker 执行循环

Worker 线程启动后,会不停地从队列获取任务并执行:

java
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;

    try {
        // 循环获取任务并执行
        while (task != null || (task = getTask()) != null) {
            // 获取锁,确保安全
            w.lock();

            try {
                // 执行前钩子
                beforeExecute(wt, task);

                try {
                    // 执行任务
                    task.run();

                    // 执行后钩子
                    afterExecute(task, null);
                } catch (Throwable ex) {
                    afterExecute(task, ex);
                    throw ex;
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
    } finally {
        // 处理完成,清理 worker
        processWorkerExit(w, completedAbruptly);
    }
}

获取任务 getTask()

java
private Runnable getTask() {
    boolean timedOut = false;

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // 判断是否需要超时回收
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            if (decrementWorkerCount() == 0)
                return null;
            continue;
        }

        // 核心线程不超时,永久阻塞等待
        // 非核心线程超时等待
        Runnable r = timed ?
            workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
            workQueue.take();

        if (r != null)
            return r;
        timedOut = true;
    }
}

线程池状态

RUNNING ──► SHUTDOWN ──► STOP ──► TIDYING ──► TERMINATED
状态说明
RUNNING111接受新任务,处理队列任务
SHUTDOWN000不接受新任务,处理队列任务
STOP001不接受新任务,不处理队列,中断正在执行的任务
TIDYING010所有任务终止,workerCount 为 0
TERMINATED011terminate() 执行完成
java
public class ThreadPoolStateDemo {

    public static void main(String[] args) throws InterruptedException {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            2, 4, 60L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(10)
        );

        System.out.println("初始状态: " + executor.isShutdown()); // false

        executor.shutdown();
        System.out.println("shutdown() 后: " + executor.isShutdown()); // true
        System.out.println("isTerminating(): " + executor.isTerminating()); // true

        executor.awaitTermination(1, TimeUnit.SECONDS);
        System.out.println("isTerminated(): " + executor.isTerminated());
    }
}

shutdown() vs shutdownNow()

方法行为
shutdown()不再接受新任务,等待队列任务执行完毕
shutdownNow()不再接受新任务,中断正在执行的任务,返回队列中的任务
java
public class ShutdownNowDemo {

    public static void main(String[] args) {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            2, 4, 60L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(10)
        );

        executor.submit(() -> {
            try {
                Thread.sleep(5000);
            } catch (InterruptedException e) {
                System.out.println("线程被中断");
            }
        });

        List<Runnable> rejected = executor.shutdownNow();
        System.out.println("被拒绝的任务数: " + rejected.size());
    }
}

注意事项

  1. 核心线程默认不会被回收:除非设置 allowCoreThreadTimeOut(true)
  2. 任务执行顺序不保证:大多数队列是 FIFO,但不保证
  3. shutdown() vs shutdownNow()
    • shutdown():不再接受新任务,等待队列任务执行完毕
    • shutdownNow():不再接受新任务,中断正在执行的任务,返回队列中的任务
  4. getTask() 的阻塞获取:非核心线程在超时后会被回收

要点回顾

线程池的核心是一个 ctl 变量和 workers 集合:

  • ctl:高 3 位存状态,低 29 位存线程数
  • workers:实际执行任务的 Worker 线程集合
  • workQueue:存放待执行任务的阻塞队列

Worker 线程在 runWorker() 中循环调用 getTask(),不断从队列获取任务并执行,直到队列为空且线程需要回收。

基于 VitePress 构建