Skip to content

高并发实践

并发编程,最终要落在「高并发」三个字上。

QPS 怎么提升?线程数怎么配置?锁怎么减少?缓存怎么用?本文聚焦实战,帮你把并发知识转化为生产级代码。

核心性能指标

理解这些指标,是高并发优化的基础:

指标含义优化方向
QPS每秒查询数吞吐量
TPS每秒事务数吞吐量
RT响应时间延迟
并发数同时处理的请求数并发能力
吞吐量单位时间处理的数据量综合能力
QPS = 并发数 / 平均响应时间

例如:
- 并发 100,平均 RT 200ms
- QPS = 100 / 0.2 = 500

线程池配置

线程池是并发的基础,配置错了什么都白搭。

CPU 密集型

java
public class CPUIntensiveConfig {

    public static ExecutorService createPool() {
        int cores = Runtime.getRuntime().availableProcessors();
        // CPU 密集型:线程数 = CPU 核心数
        return new ThreadPoolExecutor(
            cores,
            cores,
            0L, TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(100),
            new ThreadPoolExecutor.AbortPolicy()
        );
    }

    public static void main(String[] args) {
        ExecutorService pool = createPool();

        for (int i = 0; i < 100; i++) {
            pool.submit(() -> {
                // CPU 密集计算:压缩、加解密、复杂计算
                compute();
            });
        }

        pool.shutdown();
    }

    private static void compute() {
        // 模拟 CPU 密集任务
        long sum = 0;
        for (int i = 0; i < 10_000_000; i++) {
            sum += i;
        }
    }
}

I/O 密集型

java
public class IOIntensiveConfig {

    public static ExecutorService createPool() {
        int cores = Runtime.getRuntime().availableProcessors();
        // I/O 密集型:线程数 = CPU 核心数 * (1 + 等待时间/计算时间)
        // 通常设为 CPU 核心数的 2-4 倍
        int threads = cores * 4;
        return new ThreadPoolExecutor(
            cores * 2,       // 核心线程数
            threads,          // 最大线程数
            60L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(1000),
            new ThreadPoolExecutor.CallerRunsPolicy()
        );
    }
}

选择公式

线程数 = CPU 核心数 × CPU 利用率 × (1 + 等待时间 / 计算时间)

等待时间:I/O 操作、网络请求等
计算时间:CPU 计算时间

并发计数器

高并发下的计数器,普通 AtomicLong 会有竞争问题。

LongAdder:分段累加

java
public class ConcurrentCounter {

    // ❌ 普通 AtomicLong:高并发下竞争激烈
    private static final AtomicLong counter1 = new AtomicLong();

    // ✅ LongAdder:分段累加,高并发性能更好
    private static final LongAdder counter2 = new LongAdder();

    public static void main(String[] args) throws InterruptedException {
        int threads = 100;
        int opsPerThread = 10000;
        ExecutorService executor = Executors.newFixedThreadPool(threads);

        // AtomicLong 测试
        long start = System.nanoTime();
        for (int i = 0; i < threads; i++) {
            executor.submit(() -> {
                for (int j = 0; j < opsPerThread; j++) {
                    counter1.incrementAndGet();
                }
            });
        }
        executor.shutdown();
        executor.awaitTermination(1, TimeUnit.MINUTES);
        System.out.println("AtomicLong: " + counter1.get());

        // LongAdder 测试(更高性能)
    }
}

并发 Map 统计

java
public class ConcurrentStats {

    private static final ConcurrentHashMap<String, LongAdder> stats =
        new ConcurrentHashMap<>();

    public static void main(String[] args) {
        // 高性能统计:PV、UV、接口调用次数
        increment("page:/home");
        increment("page:/home");
        increment("page:/product");
        increment("user:1001");
        increment("user:1001");

        stats.forEach((key, adder) ->
            System.out.println(key + ": " + adder.sum()));
    }

    public static void increment(String key) {
        stats.computeIfAbsent(key, k -> new LongAdder()).increment();
    }
}

并发缓存

减少重复计算,降低后端压力。

Cache-Aside 模式

java
public class CacheAsidePattern {

    private static final ConcurrentHashMap<String, Object> cache =
        new ConcurrentHashMap<>();

    // 模拟数据库
    private static Map<String, String> db = Map.of(
        "user:1", "张三",
        "user:2", "李四"
    );

    public static void main(String[] args) {
        // 缓存命中
        System.out.println(get("user:1"));  // 缓存返回

        // 缓存未命中,查询数据库
        System.out.println(get("user:2"));  // 从 DB 加载
    }

    public static String get(String key) {
        // 1. 先查缓存
        Object value = cache.get(key);
        if (value != null) {
            System.out.println("Cache hit: " + key);
            return (String) value;
        }

        // 2. 缓存未命中,查数据库
        String dbValue = db.get(key);
        if (dbValue != null) {
            // 3. 写入缓存
            cache.put(key, dbValue);
            System.out.println("Cache miss, load from DB: " + key);
        }

        return dbValue;
    }
}

异步写入缓存

java
public class AsyncCacheUpdate {

    private static final ConcurrentHashMap<String, String> cache =
        new ConcurrentHashMap<>();

    // 写操作:先更新缓存,再异步更新数据库
    public static void set(String key, String value) {
        // 1. 立即更新缓存
        cache.put(key, value);

        // 2. 异步更新数据库
        CompletableFuture.runAsync(() -> {
            // db.update(key, value);
            System.out.println("Async DB update: " + key);
        });
    }
}

连接池管理

数据库连接、HTTP 连接、线程池……都是稀缺资源。

HikariCP 配置

java
public class HikariCPConfig {

    public static DataSource createDataSource() {
        HikariConfig config = new HikariConfig();
        config.setJdbcUrl("jdbc:mysql://localhost:3306/demo");
        config.setUsername("root");
        config.setPassword("password");

        // 核心配置
        config.setMaximumPoolSize(20);   // 最大连接数
        config.setMinimumIdle(5);        // 最小空闲连接
        config.setIdleTimeout(300000);  // 空闲超时 5 分钟
        config.setConnectionTimeout(30000);  // 获取连接超时 30 秒
        config.setMaxLifetime(1800000);     // 连接最大生命周期 30 分钟

        // 连接测试
        config.setConnectionTestQuery("SELECT 1");

        return new HikariDataSource(config);
    }
}

HTTP 连接池

java
public class HttpClientConfig {

    public static CloseableHttpClient createHttpClient() {
        // 连接池配置
        RequestConfig requestConfig = RequestConfig.custom()
            .setConnectTimeout(3000)      // 连接超时
            .setSocketTimeout(5000)        // 读取超时
            .setConnectionRequestTimeout(2000)  // 连接获取超时
            .build();

        // 最大连接数
        PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();
        cm.setMaxTotal(200);
        cm.setDefaultMaxPerRoute(20);

        return HttpClients.custom()
            .setConnectionManager(cm)
            .setDefaultRequestConfig(requestConfig)
            .build();
    }
}

削峰填谷

保护系统不被突发流量冲垮。

限流策略

java
public class RateLimiter {

    // 令牌桶限流
    private static final RateLimiter limiter = RateLimiter.create(1000); // 每秒 1000 个令牌

    public static void main(String[] args) {
        for (int i = 0; i < 1100; i++) {
            // 阻塞等待获取令牌
            limiter.acquire();
            process(i);
        }
    }

    private static void process(int i) {
        // 业务处理
    }
}

异步处理

java
public class AsyncProcessing {

    private static final ExecutorService executor =
        new ThreadPoolExecutor(10, 50, 60L, TimeUnit.SECONDS,
            new LinkedBlockingQueue<>(10000),
            new ThreadPoolExecutor.CallerRunsPolicy());

    public static CompletableFuture<String> processAsync(String task) {
        return CompletableFuture.supplyAsync(() -> {
            // 实际处理
            return "Processed: " + task;
        }, executor);
    }

    public static void main(String[] args) {
        // 提交 10000 个任务
        for (int i = 0; i < 10000; i++) {
            processAsync("task-" + i)
                .thenAccept(result -> System.out.println(result));
        }
    }
}

监控指标

高并发系统,必须有完善的监控。

java
public class MetricsDemo {

    private static final ConcurrentHashMap<String, Timer> timers =
        new ConcurrentHashMap<>();

    private static final ConcurrentHashMap<String, Meter> meters =
        new ConcurrentHashMap<>();

    public static <T> T timed(String operation, Supplier<T> supplier) {
        Timer timer = timers.computeIfAbsent(operation, k -> new Timer());
        long start = System.nanoTime();
        try {
            return supplier.get();
        } finally {
            timer.record(System.nanoTime() - start, TimeUnit.NANOSECONDS);
        }
    }

    public static void mark(String operation) {
        meters.computeIfAbsent(operation, k -> new Meter()).mark();
    }

    public static void printStats() {
        System.out.println("=== 性能统计 ===");
        timers.forEach((op, timer) ->
            System.out.printf("%s: avg=%.2fms, max=%.2fms%n",
                op, timer.getAverage() / 1_000_000,
                timer.getMax() / 1_000_000));
    }
}

常见问题处理

问题原因解决方案
CPU 100%线程数过多、死循环减少线程、JVM 调优
响应时间长I/O 阻塞、锁竞争异步 I/O、减少锁粒度
内存持续增长缓存无界、内存泄漏限制缓存大小、TTL
线程池拒绝任务堆积、线程数不足调整队列和线程数

要点回顾

  • 线程数公式:CPU 密集型 = 核心数,I/O 密集型 = 核心数 × (1 + 等待时间/计算时间)
  • 计数器:高并发用 LongAdder,普通场景用 AtomicLong
  • 缓存:Cache-Aside 是最实用的模式
  • 限流:保护系统不被冲垮
  • 监控:QPS、RT、错误率是核心指标

相关链接

基于 VitePress 构建