Semaphore
你有过这种经历吗?去游乐园玩,热门项目要排队。
因为同一时间只能有那么多人进去。
Semaphore 就是干这个的——控制同时访问某个资源的线程数量。
核心思想
Semaphore(信号量)维护一组许可(Permits)。
初始状态:3 个许可
┌───┬───┬───┐
│ 1 │ 1 │ 1 │ → 3 个许可可用
└───┴───┴───┘
线程 A 获取 1 个许可:
┌───┬───┬───┐
│ │ 1 │ 1 │ → 2 个许可可用
└───┴───┴───┘
线程 B 获取 1 个许可:
┌───┬───┬───┐
│ │ │ 1 │ → 1 个许可可用
└───┴───┴───┘
线程 C 获取 1 个许可:
┌───┬───┬───┐
│ │ │ │ → 0 个许可可用,线程 D 阻塞
└───┴───┴───┘
线程 A 释放 1 个许可:
┌───┬───┬───┐
│ │ │ 1 │ → 唤醒等待的线程
└───┴───┴───┘代码演示
基础用法
java
public class SemaphoreDemo {
public static void main(String[] args) {
int permitCount = 2; // 同时最多 2 个线程
Semaphore semaphore = new Semaphore(permitCount);
for (int i = 1; i <= 5; i++) {
final int threadId = i;
new Thread(() -> {
try {
System.out.println("线程 " + threadId + " 等待获取许可");
semaphore.acquire(); // 获取许可
System.out.println("线程 " + threadId + " 获取许可成功,开始执行");
Thread.sleep((long) (Math.random() * 1000 + 500));
System.out.println("线程 " + threadId + " 执行完成,释放许可");
semaphore.release(); // 释放许可
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
}
}公平模式 vs 非公平模式
java
public class SemaphoreFairness {
public static void main(String[] args) throws InterruptedException {
System.out.println("=== 非公平模式 ===");
testSemaphore(false);
Thread.sleep(2000);
System.out.println("\n=== 公平模式 ===");
testSemaphore(true);
}
private static void testSemaphore(boolean fair) throws InterruptedException {
Semaphore semaphore = new Semaphore(1, fair);
for (int i = 1; i <= 3; i++) {
final int threadId = i;
new Thread(() -> {
try {
semaphore.acquire();
System.out.println("线程 " + threadId + " 获取许可");
Thread.sleep(500);
semaphore.release();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
}
}公平模式按 FIFO 顺序获取许可,非公平模式可能插队。
实际应用:连接池模拟
java
public class ConnectionPoolDemo {
private static final int POOL_SIZE = 3; // 连接池大小
private static final Semaphore semaphore = new Semaphore(POOL_SIZE);
public static void main(String[] args) {
System.out.println("=== 数据库连接池模拟 ===\n");
for (int i = 1; i <= 6; i++) {
final int requestId = i;
new Thread(() -> {
Connection connection = getConnection();
try {
System.out.println("请求 " + requestId + " 获取连接: " + connection);
Thread.sleep((long) (Math.random() * 2000 + 500));
System.out.println("请求 " + requestId + " 释放连接");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
releaseConnection(connection);
}
}).start();
}
}
private static Connection getConnection() {
try {
semaphore.acquire();
return new Connection();
} catch (InterruptedException e) {
throw new RuntimeException("获取连接被中断");
}
}
private static void releaseConnection(Connection connection) {
connection.close();
semaphore.release();
}
static class Connection {
private static int count = 0;
private final int id = ++count;
@Override
public String toString() {
return "Connection-" + id;
}
public void close() {
// 实际场景中这里会关闭数据库连接
}
}
}实际应用:限流器
java
public class RateLimiterDemo {
private final Semaphore semaphore;
private final int maxRequests;
public RateLimiterDemo(int maxRequests) {
this.maxRequests = maxRequests;
this.semaphore = new Semaphore(maxRequests);
}
public void execute(Runnable task) throws InterruptedException {
semaphore.acquire();
try {
task.run();
} finally {
semaphore.release();
}
}
public static void main(String[] args) throws InterruptedException {
RateLimiterDemo limiter = new RateLimiterDemo(3); // 每批最多 3 个请求并发
System.out.println("=== 限流器测试(每批最多 3 个请求并发)===\n");
for (int batch = 1; batch <= 3; batch++) {
System.out.println("第 " + batch + " 批请求:");
for (int i = 1; i <= 5; i++) {
final int requestId = batch * 100 + i;
new Thread(() -> {
try {
limiter.execute(() -> {
System.out.println("请求 " + requestId + " 开始处理");
try {
Thread.sleep(300);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("请求 " + requestId + " 处理完成");
});
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
Thread.sleep(2000); // 每批间隔 2 秒
}
}
}多许可获取
java
public class SemaphoreMultiplePermits {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(3);
new Thread(() -> {
try {
System.out.println("需要 2 个许可");
semaphore.acquire(2); // 获取 2 个许可
System.out.println("获取成功,剩余许可: " + semaphore.availablePermits());
Thread.sleep(1000);
System.out.println("释放 2 个许可");
semaphore.release(2); // 释放 2 个许可
System.out.println("释放后剩余: " + semaphore.availablePermits());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}).start();
}
}tryAcquire() 非阻塞获取
java
public class SemaphoreTryAcquire {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(2);
for (int i = 1; i <= 5; i++) {
final int threadId = i;
new Thread(() -> {
// 非阻塞尝试获取许可
if (semaphore.tryAcquire()) {
System.out.println("线程 " + threadId + " 获取许可成功");
try {
Thread.sleep(500);
} finally {
semaphore.release();
}
} else {
System.out.println("线程 " + threadId + " 获取许可失败(无可用许可)");
}
}).start();
}
}
}方法一览
| 方法 | 说明 |
|---|---|
Semaphore(int permits) | 创建指定许可数的信号量(非公平) |
Semaphore(int permits, boolean fair) | 创建公平或非公平信号量 |
acquire() | 获取一个许可,阻塞等待 |
acquire(int permits) | 获取指定数量的许可 |
tryAcquire() | 尝试获取,非阻塞 |
tryAcquire(long timeout, TimeUnit) | 超时尝试获取 |
release() | 释放一个许可 |
release(int permits) | 释放指定数量的许可 |
availablePermits() | 获取可用许可数 |
注意事项
- 许可数量:可以动态调整
- 公平性:公平模式按 FIFO 顺序分配,但性能略低
- 释放许可:必须在 finally 块中确保释放
- tryAcquire():非阻塞获取,立即返回
java
// ✅ 正确示例
Semaphore semaphore = new Semaphore(1);
try {
semaphore.acquire();
// 使用资源
} finally {
semaphore.release(); // 确保释放
}要点回顾
Semaphore 适用于需要控制并发访问数量的场景:
- 限流:限制同时访问的请求数
- 资源池:数据库连接池、线程池
- 并发控制:控制同时执行的操作数
记住:获取和释放要配对,通常在 finally 中释放。
