高并发实践
并发编程,最终要落在「高并发」三个字上。
QPS 怎么提升?线程数怎么配置?锁怎么减少?缓存怎么用?本文聚焦实战,帮你把并发知识转化为生产级代码。
核心性能指标
理解这些指标,是高并发优化的基础:
| 指标 | 含义 | 优化方向 |
|---|---|---|
| QPS | 每秒查询数 | 吞吐量 |
| TPS | 每秒事务数 | 吞吐量 |
| RT | 响应时间 | 延迟 |
| 并发数 | 同时处理的请求数 | 并发能力 |
| 吞吐量 | 单位时间处理的数据量 | 综合能力 |
QPS = 并发数 / 平均响应时间
例如:
- 并发 100,平均 RT 200ms
- QPS = 100 / 0.2 = 500线程池配置
线程池是并发的基础,配置错了什么都白搭。
CPU 密集型
java
public class CPUIntensiveConfig {
public static ExecutorService createPool() {
int cores = Runtime.getRuntime().availableProcessors();
// CPU 密集型:线程数 = CPU 核心数
return new ThreadPoolExecutor(
cores,
cores,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(100),
new ThreadPoolExecutor.AbortPolicy()
);
}
public static void main(String[] args) {
ExecutorService pool = createPool();
for (int i = 0; i < 100; i++) {
pool.submit(() -> {
// CPU 密集计算:压缩、加解密、复杂计算
compute();
});
}
pool.shutdown();
}
private static void compute() {
// 模拟 CPU 密集任务
long sum = 0;
for (int i = 0; i < 10_000_000; i++) {
sum += i;
}
}
}I/O 密集型
java
public class IOIntensiveConfig {
public static ExecutorService createPool() {
int cores = Runtime.getRuntime().availableProcessors();
// I/O 密集型:线程数 = CPU 核心数 * (1 + 等待时间/计算时间)
// 通常设为 CPU 核心数的 2-4 倍
int threads = cores * 4;
return new ThreadPoolExecutor(
cores * 2, // 核心线程数
threads, // 最大线程数
60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
new ThreadPoolExecutor.CallerRunsPolicy()
);
}
}选择公式
线程数 = CPU 核心数 × CPU 利用率 × (1 + 等待时间 / 计算时间)
等待时间:I/O 操作、网络请求等
计算时间:CPU 计算时间并发计数器
高并发下的计数器,普通 AtomicLong 会有竞争问题。
LongAdder:分段累加
java
public class ConcurrentCounter {
// ❌ 普通 AtomicLong:高并发下竞争激烈
private static final AtomicLong counter1 = new AtomicLong();
// ✅ LongAdder:分段累加,高并发性能更好
private static final LongAdder counter2 = new LongAdder();
public static void main(String[] args) throws InterruptedException {
int threads = 100;
int opsPerThread = 10000;
ExecutorService executor = Executors.newFixedThreadPool(threads);
// AtomicLong 测试
long start = System.nanoTime();
for (int i = 0; i < threads; i++) {
executor.submit(() -> {
for (int j = 0; j < opsPerThread; j++) {
counter1.incrementAndGet();
}
});
}
executor.shutdown();
executor.awaitTermination(1, TimeUnit.MINUTES);
System.out.println("AtomicLong: " + counter1.get());
// LongAdder 测试(更高性能)
}
}并发 Map 统计
java
public class ConcurrentStats {
private static final ConcurrentHashMap<String, LongAdder> stats =
new ConcurrentHashMap<>();
public static void main(String[] args) {
// 高性能统计:PV、UV、接口调用次数
increment("page:/home");
increment("page:/home");
increment("page:/product");
increment("user:1001");
increment("user:1001");
stats.forEach((key, adder) ->
System.out.println(key + ": " + adder.sum()));
}
public static void increment(String key) {
stats.computeIfAbsent(key, k -> new LongAdder()).increment();
}
}并发缓存
减少重复计算,降低后端压力。
Cache-Aside 模式
java
public class CacheAsidePattern {
private static final ConcurrentHashMap<String, Object> cache =
new ConcurrentHashMap<>();
// 模拟数据库
private static Map<String, String> db = Map.of(
"user:1", "张三",
"user:2", "李四"
);
public static void main(String[] args) {
// 缓存命中
System.out.println(get("user:1")); // 缓存返回
// 缓存未命中,查询数据库
System.out.println(get("user:2")); // 从 DB 加载
}
public static String get(String key) {
// 1. 先查缓存
Object value = cache.get(key);
if (value != null) {
System.out.println("Cache hit: " + key);
return (String) value;
}
// 2. 缓存未命中,查数据库
String dbValue = db.get(key);
if (dbValue != null) {
// 3. 写入缓存
cache.put(key, dbValue);
System.out.println("Cache miss, load from DB: " + key);
}
return dbValue;
}
}异步写入缓存
java
public class AsyncCacheUpdate {
private static final ConcurrentHashMap<String, String> cache =
new ConcurrentHashMap<>();
// 写操作:先更新缓存,再异步更新数据库
public static void set(String key, String value) {
// 1. 立即更新缓存
cache.put(key, value);
// 2. 异步更新数据库
CompletableFuture.runAsync(() -> {
// db.update(key, value);
System.out.println("Async DB update: " + key);
});
}
}连接池管理
数据库连接、HTTP 连接、线程池……都是稀缺资源。
HikariCP 配置
java
public class HikariCPConfig {
public static DataSource createDataSource() {
HikariConfig config = new HikariConfig();
config.setJdbcUrl("jdbc:mysql://localhost:3306/demo");
config.setUsername("root");
config.setPassword("password");
// 核心配置
config.setMaximumPoolSize(20); // 最大连接数
config.setMinimumIdle(5); // 最小空闲连接
config.setIdleTimeout(300000); // 空闲超时 5 分钟
config.setConnectionTimeout(30000); // 获取连接超时 30 秒
config.setMaxLifetime(1800000); // 连接最大生命周期 30 分钟
// 连接测试
config.setConnectionTestQuery("SELECT 1");
return new HikariDataSource(config);
}
}HTTP 连接池
java
public class HttpClientConfig {
public static CloseableHttpClient createHttpClient() {
// 连接池配置
RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(3000) // 连接超时
.setSocketTimeout(5000) // 读取超时
.setConnectionRequestTimeout(2000) // 连接获取超时
.build();
// 最大连接数
PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
cm.setMaxTotal(200);
cm.setDefaultMaxPerRoute(20);
return HttpClients.custom()
.setConnectionManager(cm)
.setDefaultRequestConfig(requestConfig)
.build();
}
}削峰填谷
保护系统不被突发流量冲垮。
限流策略
java
public class RateLimiter {
// 令牌桶限流
private static final RateLimiter limiter = RateLimiter.create(1000); // 每秒 1000 个令牌
public static void main(String[] args) {
for (int i = 0; i < 1100; i++) {
// 阻塞等待获取令牌
limiter.acquire();
process(i);
}
}
private static void process(int i) {
// 业务处理
}
}异步处理
java
public class AsyncProcessing {
private static final ExecutorService executor =
new ThreadPoolExecutor(10, 50, 60L, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(10000),
new ThreadPoolExecutor.CallerRunsPolicy());
public static CompletableFuture<String> processAsync(String task) {
return CompletableFuture.supplyAsync(() -> {
// 实际处理
return "Processed: " + task;
}, executor);
}
public static void main(String[] args) {
// 提交 10000 个任务
for (int i = 0; i < 10000; i++) {
processAsync("task-" + i)
.thenAccept(result -> System.out.println(result));
}
}
}监控指标
高并发系统,必须有完善的监控。
java
public class MetricsDemo {
private static final ConcurrentHashMap<String, Timer> timers =
new ConcurrentHashMap<>();
private static final ConcurrentHashMap<String, Meter> meters =
new ConcurrentHashMap<>();
public static <T> T timed(String operation, Supplier<T> supplier) {
Timer timer = timers.computeIfAbsent(operation, k -> new Timer());
long start = System.nanoTime();
try {
return supplier.get();
} finally {
timer.record(System.nanoTime() - start, TimeUnit.NANOSECONDS);
}
}
public static void mark(String operation) {
meters.computeIfAbsent(operation, k -> new Meter()).mark();
}
public static void printStats() {
System.out.println("=== 性能统计 ===");
timers.forEach((op, timer) ->
System.out.printf("%s: avg=%.2fms, max=%.2fms%n",
op, timer.getAverage() / 1_000_000,
timer.getMax() / 1_000_000));
}
}常见问题处理
| 问题 | 原因 | 解决方案 |
|---|---|---|
| CPU 100% | 线程数过多、死循环 | 减少线程、JVM 调优 |
| 响应时间长 | I/O 阻塞、锁竞争 | 异步 I/O、减少锁粒度 |
| 内存持续增长 | 缓存无界、内存泄漏 | 限制缓存大小、TTL |
| 线程池拒绝 | 任务堆积、线程数不足 | 调整队列和线程数 |
要点回顾
- 线程数公式:CPU 密集型 = 核心数,I/O 密集型 = 核心数 × (1 + 等待时间/计算时间)
- 计数器:高并发用 LongAdder,普通场景用 AtomicLong
- 缓存:Cache-Aside 是最实用的模式
- 限流:保护系统不被冲垮
- 监控:QPS、RT、错误率是核心指标
