线程池监控
线程池上线了,怎么知道它是否健康?
监控是运维的第三只眼。
ThreadPoolExecutor 的监控方法
ThreadPoolExecutor 提供了丰富的监控指标:
| 方法 | 说明 |
|---|---|
getActiveCount() | 当前正在执行任务的线程数 |
getPoolSize() | 当前线程池中的线程总数 |
getCorePoolSize() | 核心线程数 |
getMaximumPoolSize() | 最大线程数 |
getCompletedTaskCount() | 已完成的任务总数 |
getTaskCount() | 已提交的任务总数(含正在执行的) |
getQueue().size() | 队列中的任务数 |
getQueue().remainingCapacity() | 队列剩余容量 |
代码演示
基础监控
java
public class BasicMonitorDemo {
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 4, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10)
);
System.out.println("=== 初始状态 ===");
printStats(executor);
// 提交任务
for (int i = 1; i <= 5; i++) {
final int taskId = i;
executor.submit(() -> {
try {
Thread.sleep(1000);
System.out.println("任务 " + taskId + " 完成");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
Thread.sleep(500);
System.out.println("\n=== 任务提交后 ===");
printStats(executor);
Thread.sleep(3000);
System.out.println("\n=== 所有任务完成后 ===");
printStats(executor);
executor.shutdown();
}
private static void printStats(ThreadPoolExecutor executor) {
System.out.println("核心线程数: " + executor.getCorePoolSize());
System.out.println("最大线程数: " + executor.getMaximumPoolSize());
System.out.println("当前线程数: " + executor.getPoolSize());
System.out.println("活跃线程数: " + executor.getActiveCount());
System.out.println("已完成任务: " + executor.getCompletedTaskCount());
System.out.println("队列大小: " + executor.getQueue().size());
System.out.println("队列容量: " + executor.getQueue().remainingCapacity());
}
}定时监控
java
public class ScheduledMonitorDemo {
public static void main(String[] args) throws InterruptedException {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 4, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10)
);
// 每秒监控一次
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
scheduler.scheduleAtFixedRate(() -> {
printDetailedStats(executor);
}, 0, 1, TimeUnit.SECONDS);
// 提交任务
for (int i = 1; i <= 10; i++) {
final int taskId = i;
executor.submit(() -> {
try {
Thread.sleep(new Random().nextInt(2000) + 500);
System.out.println("任务 " + taskId + " 完成");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
Thread.sleep(8000);
scheduler.shutdown();
executor.shutdown();
}
private static void printDetailedStats(ThreadPoolExecutor executor) {
System.out.println("=== [" + LocalDateTime.now() + "] ===");
System.out.printf("线程数: %d/%d (活跃: %d)%n",
executor.getPoolSize(),
executor.getMaximumPoolSize(),
executor.getActiveCount());
System.out.printf("任务: 完成=%d, 队列=%d%n",
executor.getCompletedTaskCount(),
executor.getQueue().size());
System.out.printf("利用率: %.2f%%%n",
executor.getPoolSize() * 100.0 / executor.getMaximumPoolSize());
}
}告警机制
java
public class AlertingMonitor {
private static final int THREAD_POOL_SIZE_ALERT_THRESHOLD = 80; // 80%
private static final int QUEUE_SIZE_ALERT_THRESHOLD = 100;
public static void monitor(ThreadPoolExecutor executor) {
int poolSize = executor.getPoolSize();
int maxPoolSize = executor.getMaximumPoolSize();
int activeCount = executor.getActiveCount();
int queueSize = executor.getQueue().size();
// 线程池使用率告警
double poolUsage = (double) poolSize / maxPoolSize * 100;
if (poolUsage >= THREAD_POOL_SIZE_ALERT_THRESHOLD) {
sendAlert("线程池使用率过高: " + String.format("%.1f", poolUsage) + "%");
}
// 活跃线程数告警
if (activeCount == maxPoolSize) {
sendAlert("所有工作线程都在忙碌,可能需要扩容");
}
// 队列积压告警
if (queueSize >= QUEUE_SIZE_ALERT_THRESHOLD) {
sendAlert("任务队列积压严重: " + queueSize + " 个任务等待");
}
}
private static void sendAlert(String message) {
System.err.println("[ALERT] " + message);
// 实际应用中:发送邮件、短信、钉钉等
}
}性能指标收集
java
public class MetricsCollector {
private final ThreadPoolExecutor executor;
private final Map<String, LongAdder> metrics = new ConcurrentHashMap<>();
public MetricsCollector(ThreadPoolExecutor executor) {
this.executor = executor;
metrics.put("submitted", new LongAdder());
metrics.put("completed", new LongAdder());
metrics.put("rejected", new LongAdder());
metrics.put("totalTime", new LongAdder());
}
public void recordSubmit() {
metrics.get("submitted").increment();
}
public void recordComplete(long executionTimeNanos) {
metrics.get("completed").increment();
metrics.get("totalTime").add(executionTimeNanos);
}
public void recordReject() {
metrics.get("rejected").increment();
}
public Map<String, Object> getMetrics() {
Map<String, Object> result = new HashMap<>();
result.put("submitted", metrics.get("submitted").sum());
result.put("completed", metrics.get("completed").sum());
result.put("rejected", metrics.get("rejected").sum());
long totalTime = metrics.get("totalTime").sum();
long completed = metrics.get("completed").sum();
if (completed > 0) {
result.put("avgTimeMs", totalTime / completed / 1_000_000.0);
}
result.put("active", executor.getActiveCount());
result.put("poolSize", executor.getPoolSize());
result.put("queueSize", executor.getQueue().size());
return result;
}
}JMX 监控
java
public class JMXMonitorDemo {
public static void main(String[] args) throws Exception {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
2, 4, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10)
);
// 注册 MBean
ObjectName objectName = new ObjectName("myapp:type=ThreadPoolExecutor,name=WorkerPool");
MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer();
mBeanServer.registerMBean(executor, objectName);
System.out.println("JMX 监控已启动,请使用 JConsole 连接");
// 模拟任务
for (int i = 0; i < 10; i++) {
executor.submit(() -> {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
Thread.sleep(5000);
executor.shutdown();
}
}监控维度
| 维度 | 指标 | 告警阈值建议 |
|---|---|---|
| 线程池大小 | getPoolSize() | >= 80% 最大线程数 |
| 活跃线程 | getActiveCount() | == 最大线程数 |
| 队列积压 | getQueue().size() | > 队列容量 80% |
| 任务拒绝 | rejectedCount | > 0 |
| 平均执行时间 | avgTime | 超过预期 |
注意事项
- 监控不影响性能:监控操作应该是轻量的
- 定期采样:持续监控才能发现趋势问题
- 设置告警阈值:提前发现问题
- 保留历史数据:用于分析长期趋势
- 生产环境:使用专业的监控工具(Prometheus、Grafana 等)
要点回顾
线程池监控的核心是三个指标:
- 线程数:当前 vs 最大,决定了能处理多少并发
- 队列大小:积压了多少任务
- 活跃线程:有多少线程正在干活
结合这些指标,就能判断线程池是否健康。
