Skip to content

线程池监控

线程池上线了,怎么知道它是否健康?

监控是运维的第三只眼。

ThreadPoolExecutor 的监控方法

ThreadPoolExecutor 提供了丰富的监控指标:

方法说明
getActiveCount()当前正在执行任务的线程数
getPoolSize()当前线程池中的线程总数
getCorePoolSize()核心线程数
getMaximumPoolSize()最大线程数
getCompletedTaskCount()已完成的任务总数
getTaskCount()已提交的任务总数(含正在执行的)
getQueue().size()队列中的任务数
getQueue().remainingCapacity()队列剩余容量

代码演示

基础监控

java
public class BasicMonitorDemo {

    public static void main(String[] args) throws InterruptedException {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            2, 4, 60L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(10)
        );

        System.out.println("=== 初始状态 ===");
        printStats(executor);

        // 提交任务
        for (int i = 1; i <= 5; i++) {
            final int taskId = i;
            executor.submit(() -> {
                try {
                    Thread.sleep(1000);
                    System.out.println("任务 " + taskId + " 完成");
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }

        Thread.sleep(500);
        System.out.println("\n=== 任务提交后 ===");
        printStats(executor);

        Thread.sleep(3000);
        System.out.println("\n=== 所有任务完成后 ===");
        printStats(executor);

        executor.shutdown();
    }

    private static void printStats(ThreadPoolExecutor executor) {
        System.out.println("核心线程数: " + executor.getCorePoolSize());
        System.out.println("最大线程数: " + executor.getMaximumPoolSize());
        System.out.println("当前线程数: " + executor.getPoolSize());
        System.out.println("活跃线程数: " + executor.getActiveCount());
        System.out.println("已完成任务: " + executor.getCompletedTaskCount());
        System.out.println("队列大小: " + executor.getQueue().size());
        System.out.println("队列容量: " + executor.getQueue().remainingCapacity());
    }
}

定时监控

java
public class ScheduledMonitorDemo {

    public static void main(String[] args) throws InterruptedException {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            2, 4, 60L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(10)
        );

        // 每秒监控一次
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
        scheduler.scheduleAtFixedRate(() -> {
            printDetailedStats(executor);
        }, 0, 1, TimeUnit.SECONDS);

        // 提交任务
        for (int i = 1; i <= 10; i++) {
            final int taskId = i;
            executor.submit(() -> {
                try {
                    Thread.sleep(new Random().nextInt(2000) + 500);
                    System.out.println("任务 " + taskId + " 完成");
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }

        Thread.sleep(8000);
        scheduler.shutdown();
        executor.shutdown();
    }

    private static void printDetailedStats(ThreadPoolExecutor executor) {
        System.out.println("=== [" + LocalDateTime.now() + "] ===");
        System.out.printf("线程数: %d/%d (活跃: %d)%n",
            executor.getPoolSize(),
            executor.getMaximumPoolSize(),
            executor.getActiveCount());
        System.out.printf("任务: 完成=%d, 队列=%d%n",
            executor.getCompletedTaskCount(),
            executor.getQueue().size());
        System.out.printf("利用率: %.2f%%%n",
            executor.getPoolSize() * 100.0 / executor.getMaximumPoolSize());
    }
}

告警机制

java
public class AlertingMonitor {

    private static final int THREAD_POOL_SIZE_ALERT_THRESHOLD = 80;  // 80%
    private static final int QUEUE_SIZE_ALERT_THRESHOLD = 100;

    public static void monitor(ThreadPoolExecutor executor) {
        int poolSize = executor.getPoolSize();
        int maxPoolSize = executor.getMaximumPoolSize();
        int activeCount = executor.getActiveCount();
        int queueSize = executor.getQueue().size();

        // 线程池使用率告警
        double poolUsage = (double) poolSize / maxPoolSize * 100;
        if (poolUsage >= THREAD_POOL_SIZE_ALERT_THRESHOLD) {
            sendAlert("线程池使用率过高: " + String.format("%.1f", poolUsage) + "%");
        }

        // 活跃线程数告警
        if (activeCount == maxPoolSize) {
            sendAlert("所有工作线程都在忙碌,可能需要扩容");
        }

        // 队列积压告警
        if (queueSize >= QUEUE_SIZE_ALERT_THRESHOLD) {
            sendAlert("任务队列积压严重: " + queueSize + " 个任务等待");
        }
    }

    private static void sendAlert(String message) {
        System.err.println("[ALERT] " + message);
        // 实际应用中:发送邮件、短信、钉钉等
    }
}

性能指标收集

java
public class MetricsCollector {

    private final ThreadPoolExecutor executor;
    private final Map<String, LongAdder> metrics = new ConcurrentHashMap<>();

    public MetricsCollector(ThreadPoolExecutor executor) {
        this.executor = executor;

        metrics.put("submitted", new LongAdder());
        metrics.put("completed", new LongAdder());
        metrics.put("rejected", new LongAdder());
        metrics.put("totalTime", new LongAdder());
    }

    public void recordSubmit() {
        metrics.get("submitted").increment();
    }

    public void recordComplete(long executionTimeNanos) {
        metrics.get("completed").increment();
        metrics.get("totalTime").add(executionTimeNanos);
    }

    public void recordReject() {
        metrics.get("rejected").increment();
    }

    public Map<String, Object> getMetrics() {
        Map<String, Object> result = new HashMap<>();
        result.put("submitted", metrics.get("submitted").sum());
        result.put("completed", metrics.get("completed").sum());
        result.put("rejected", metrics.get("rejected").sum());

        long totalTime = metrics.get("totalTime").sum();
        long completed = metrics.get("completed").sum();
        if (completed > 0) {
            result.put("avgTimeMs", totalTime / completed / 1_000_000.0);
        }

        result.put("active", executor.getActiveCount());
        result.put("poolSize", executor.getPoolSize());
        result.put("queueSize", executor.getQueue().size());

        return result;
    }
}

JMX 监控

java
public class JMXMonitorDemo {

    public static void main(String[] args) throws Exception {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
            2, 4, 60L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(10)
        );

        // 注册 MBean
        ObjectName objectName = new ObjectName("myapp:type=ThreadPoolExecutor,name=WorkerPool");
        MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
        mBeanServer.registerMBean(executor, objectName);

        System.out.println("JMX 监控已启动,请使用 JConsole 连接");

        // 模拟任务
        for (int i = 0; i < 10; i++) {
            executor.submit(() -> {
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            });
        }

        Thread.sleep(5000);
        executor.shutdown();
    }
}

监控维度

维度指标告警阈值建议
线程池大小getPoolSize()>= 80% 最大线程数
活跃线程getActiveCount()== 最大线程数
队列积压getQueue().size()> 队列容量 80%
任务拒绝rejectedCount> 0
平均执行时间avgTime超过预期

注意事项

  1. 监控不影响性能:监控操作应该是轻量的
  2. 定期采样:持续监控才能发现趋势问题
  3. 设置告警阈值:提前发现问题
  4. 保留历史数据:用于分析长期趋势
  5. 生产环境:使用专业的监控工具(Prometheus、Grafana 等)

要点回顾

线程池监控的核心是三个指标:

  • 线程数:当前 vs 最大,决定了能处理多少并发
  • 队列大小:积压了多少任务
  • 活跃线程:有多少线程正在干活

结合这些指标,就能判断线程池是否健康。

基于 VitePress 构建