DelayQueue
「让任务自己决定什么时候该被执行。」
DelayQueue 是一个支持延迟获取的无界阻塞队列。元素必须实现 Delayed 接口,只有当元素的延迟到期时,才能从队列中取出。
核心原理
┌──────────────────────────────────────────────────┐
│ DelayQueue 原理 │
├──────────────────────────────────────────────────┤
│ │
│ 内部结构:PriorityQueue(基于堆的优先级队列) │
│ │
│ 取出规则: │
│ ┌─────────────────────────────────────────┐ │
│ │ take() 时: │ │
│ │ 1. 查看堆顶元素是否到期 │ │
│ │ 2. 未到期 → 阻塞等待 │ │
│ │ 3. 已到期 → 返回元素并移除 │ │
│ └─────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────┐ │
│ │ poll() 时: │ │
│ │ 1. 查看堆顶元素是否到期 │ │
│ │ 2. 未到期 → 返回 null(不阻塞) │ │
│ │ 3. 已到期 → 返回元素并移除 │ │
│ └─────────────────────────────────────────┘ │
│ │
└──────────────────────────────────────────────────┘Delayed 接口
元素必须实现 Delayed 接口:
java
public interface Delayed extends Comparable<Delayed> {
// 返回剩余延迟时间,负数表示已到期
long getDelay(TimeUnit unit);
}继承自 Comparable,用于在队列中按到期时间排序。
基础用法
java
public class DelayQueueDemo {
public static void main(String[] args) throws InterruptedException {
DelayQueue<DelayedTask> queue = new DelayQueue<>();
// 添加延迟任务(数字表示延迟毫秒数)
queue.put(new DelayedTask("任务1", 2000)); // 2秒后到期
queue.put(new DelayedTask("任务2", 1000)); // 1秒后到期
queue.put(new DelayedTask("任务3", 3000)); // 3秒后到期
System.out.println("任务已添加,等待到期...");
// 按到期顺序取出
while (!queue.isEmpty()) {
DelayedTask task = queue.take(); // 阻塞直到到期
System.out.println("任务到期: " + task.name);
}
}
static class DelayedTask implements Delayed {
String name;
long expireTime; // 到期时间戳
DelayedTask(String name, long delayMillis) {
this.name = name;
this.expireTime = System.currentTimeMillis() + delayMillis;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(
expireTime - System.currentTimeMillis(),
TimeUnit.MILLISECONDS
);
}
@Override
public int compareTo(Delayed o) {
return Long.compare(this.expireTime, ((DelayedTask) o).expireTime);
}
}
}输出:
任务已添加,等待到期...
任务到期: 任务2 (1秒后)
任务到期: 任务1 (再过1秒)
任务到期: 任务3 (再过1秒)定时缓存
用 DelayQueue 实现带过期时间的缓存:
java
public class DelayCacheDemo {
private static final ConcurrentHashMap<String, CacheEntry> cache =
new ConcurrentHashMap<>();
public static void main(String[] args) throws InterruptedException {
// 放入缓存
put("token-1", "用户1的令牌", 2000);
put("token-2", "用户2的令牌", 1000);
System.out.println("初始: token-1=" + get("token-1")
+ ", token-2=" + get("token-2"));
Thread.sleep(1500);
System.out.println("1.5秒后: token-1=" + get("token-1")
+ ", token-2=" + get("token-2")); // token-2 应过期
Thread.sleep(1000);
System.out.println("2.5秒后: token-1=" + get("token-1")
+ ", token-2=" + get("token-2")); // token-1 也应过期
}
static class CacheEntry {
String value;
long expireTime;
CacheEntry(String value, long ttlMillis) {
this.value = value;
this.expireTime = System.currentTimeMillis() + ttlMillis;
}
boolean isExpired() {
return System.currentTimeMillis() > expireTime;
}
}
public static void put(String key, String value, long ttlMillis) {
cache.put(key, new CacheEntry(value, ttlMillis));
}
public static String get(String key) {
CacheEntry entry = cache.get(key);
if (entry == null) return null;
if (entry.isExpired()) {
cache.remove(key);
return null;
}
return entry.value;
}
}定时任务调度器
java
public class ScheduledTaskDemo {
private static final DelayQueue<ScheduledTask> taskQueue =
new DelayQueue<>();
public static void main(String[] args) {
// 添加定时任务
taskQueue.put(new ScheduledTask(() ->
System.out.println("定时任务A 执行: " + System.currentTimeMillis())
, 1000));
taskQueue.put(new ScheduledTask(() ->
System.out.println("定时任务B 执行: " + System.currentTimeMillis())
, 2000));
// 执行线程
Thread executor = new Thread(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
ScheduledTask task = taskQueue.take(); // 阻塞等待
task.run();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
});
executor.start();
}
static class ScheduledTask implements Delayed {
Runnable task;
long executeTime;
ScheduledTask(Runnable task, long delayMillis) {
this.task = task;
this.executeTime = System.currentTimeMillis() + delayMillis;
}
public void run() {
task.run();
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(
executeTime - System.currentTimeMillis(),
TimeUnit.MILLISECONDS
);
}
@Override
public int compareTo(Delayed o) {
return Long.compare(this.executeTime, ((ScheduledTask) o).executeTime);
}
}
}与 ScheduledExecutorService 对比
| 特性 | DelayQueue | ScheduledExecutorService |
|---|---|---|
| 精确度 | 毫秒级 | 纳秒级 |
| 任务管理 | 手动管理 | 自动管理 |
| 重复执行 | 需要手动重新入队 | 支持固定速率/延迟 |
| 持久化 | 不支持 | 不支持 |
| 适用场景 | 简单延迟队列、定时缓存 | 复杂定时调度 |
注意事项
- 无界队列:
put()永不阻塞,可能无限增长 - 元素必须实现 Delayed:自定义类的基本要求
- 空队列 take() 会阻塞:直到有元素到期
- poll() 不阻塞:到期返回 null
- 不保证精确计时:依赖 getDelay() 的实现
常见错误
错误一:getDelay 返回错误值
java
// ❌ 错误:getDelay 返回固定值
public long getDelay(TimeUnit unit) {
return 1000; // 永远返回1秒
}
// ✅ 正确:返回相对当前时间的剩余延迟
public long getDelay(TimeUnit unit) {
return unit.convert(
expireTime - System.currentTimeMillis(),
TimeUnit.MILLISECONDS
);
}错误二:compareTo 实现错误
java
// ❌ 错误:忘记实现 compareTo
// DelayQueue 依赖它排序
// ✅ 正确:按到期时间排序
public int compareTo(Delayed o) {
return Long.compare(this.expireTime, ((DelayedTask) o).expireTime);
}要点回顾
- DelayQueue 是支持延迟获取的无界阻塞队列
- 元素必须实现
Delayed接口 take()阻塞直到有元素到期poll()不阻塞,到期返回 null- 适合:定时缓存、延迟任务、订单超时处理
相关链接
- 阻塞队列 - BlockingQueue 家族
- ConcurrentQueue - 非阻塞队列
- JUC 集合选择 - 根据场景选择集合
