Skip to content

DelayQueue

「让任务自己决定什么时候该被执行。」

DelayQueue 是一个支持延迟获取的无界阻塞队列。元素必须实现 Delayed 接口,只有当元素的延迟到期时,才能从队列中取出。

核心原理

┌──────────────────────────────────────────────────┐
│                  DelayQueue 原理                   │
├──────────────────────────────────────────────────┤
│                                                  │
│  内部结构:PriorityQueue(基于堆的优先级队列)      │
│                                                  │
│  取出规则:                                        │
│  ┌─────────────────────────────────────────┐   │
│  │ take() 时:                              │   │
│  │  1. 查看堆顶元素是否到期                  │   │
│  │  2. 未到期 → 阻塞等待                     │   │
│  │  3. 已到期 → 返回元素并移除               │   │
│  └─────────────────────────────────────────┘   │
│                                                  │
│  ┌─────────────────────────────────────────┐   │
│  │ poll() 时:                              │   │
│  │  1. 查看堆顶元素是否到期                  │   │
│  │  2. 未到期 → 返回 null(不阻塞)         │   │
│  │  3. 已到期 → 返回元素并移除               │   │
│  └─────────────────────────────────────────┘   │
│                                                  │
└──────────────────────────────────────────────────┘

Delayed 接口

元素必须实现 Delayed 接口:

java
public interface Delayed extends Comparable<Delayed> {
    // 返回剩余延迟时间,负数表示已到期
    long getDelay(TimeUnit unit);
}

继承自 Comparable,用于在队列中按到期时间排序。

基础用法

java
public class DelayQueueDemo {

    public static void main(String[] args) throws InterruptedException {
        DelayQueue<DelayedTask> queue = new DelayQueue<>();

        // 添加延迟任务(数字表示延迟毫秒数)
        queue.put(new DelayedTask("任务1", 2000)); // 2秒后到期
        queue.put(new DelayedTask("任务2", 1000)); // 1秒后到期
        queue.put(new DelayedTask("任务3", 3000)); // 3秒后到期

        System.out.println("任务已添加,等待到期...");

        // 按到期顺序取出
        while (!queue.isEmpty()) {
            DelayedTask task = queue.take();  // 阻塞直到到期
            System.out.println("任务到期: " + task.name);
        }
    }

    static class DelayedTask implements Delayed {
        String name;
        long expireTime;  // 到期时间戳

        DelayedTask(String name, long delayMillis) {
            this.name = name;
            this.expireTime = System.currentTimeMillis() + delayMillis;
        }

        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(
                expireTime - System.currentTimeMillis(),
                TimeUnit.MILLISECONDS
            );
        }

        @Override
        public int compareTo(Delayed o) {
            return Long.compare(this.expireTime, ((DelayedTask) o).expireTime);
        }
    }
}

输出:

任务已添加,等待到期...
任务到期: 任务2  (1秒后)
任务到期: 任务1  (再过1秒)
任务到期: 任务3  (再过1秒)

定时缓存

用 DelayQueue 实现带过期时间的缓存:

java
public class DelayCacheDemo {

    private static final ConcurrentHashMap<String, CacheEntry> cache =
        new ConcurrentHashMap<>();

    public static void main(String[] args) throws InterruptedException {
        // 放入缓存
        put("token-1", "用户1的令牌", 2000);
        put("token-2", "用户2的令牌", 1000);

        System.out.println("初始: token-1=" + get("token-1")
            + ", token-2=" + get("token-2"));

        Thread.sleep(1500);

        System.out.println("1.5秒后: token-1=" + get("token-1")
            + ", token-2=" + get("token-2"));  // token-2 应过期

        Thread.sleep(1000);

        System.out.println("2.5秒后: token-1=" + get("token-1")
            + ", token-2=" + get("token-2"));  // token-1 也应过期
    }

    static class CacheEntry {
        String value;
        long expireTime;

        CacheEntry(String value, long ttlMillis) {
            this.value = value;
            this.expireTime = System.currentTimeMillis() + ttlMillis;
        }

        boolean isExpired() {
            return System.currentTimeMillis() > expireTime;
        }
    }

    public static void put(String key, String value, long ttlMillis) {
        cache.put(key, new CacheEntry(value, ttlMillis));
    }

    public static String get(String key) {
        CacheEntry entry = cache.get(key);
        if (entry == null) return null;
        if (entry.isExpired()) {
            cache.remove(key);
            return null;
        }
        return entry.value;
    }
}

定时任务调度器

java
public class ScheduledTaskDemo {

    private static final DelayQueue<ScheduledTask> taskQueue =
        new DelayQueue<>();

    public static void main(String[] args) {
        // 添加定时任务
        taskQueue.put(new ScheduledTask(() ->
            System.out.println("定时任务A 执行: " + System.currentTimeMillis())
        , 1000));

        taskQueue.put(new ScheduledTask(() ->
            System.out.println("定时任务B 执行: " + System.currentTimeMillis())
        , 2000));

        // 执行线程
        Thread executor = new Thread(() -> {
            while (!Thread.currentThread().isInterrupted()) {
                try {
                    ScheduledTask task = taskQueue.take();  // 阻塞等待
                    task.run();
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        });
        executor.start();
    }

    static class ScheduledTask implements Delayed {
        Runnable task;
        long executeTime;

        ScheduledTask(Runnable task, long delayMillis) {
            this.task = task;
            this.executeTime = System.currentTimeMillis() + delayMillis;
        }

        public void run() {
            task.run();
        }

        @Override
        public long getDelay(TimeUnit unit) {
            return unit.convert(
                executeTime - System.currentTimeMillis(),
                TimeUnit.MILLISECONDS
            );
        }

        @Override
        public int compareTo(Delayed o) {
            return Long.compare(this.executeTime, ((ScheduledTask) o).executeTime);
        }
    }
}

与 ScheduledExecutorService 对比

特性DelayQueueScheduledExecutorService
精确度毫秒级纳秒级
任务管理手动管理自动管理
重复执行需要手动重新入队支持固定速率/延迟
持久化不支持不支持
适用场景简单延迟队列、定时缓存复杂定时调度

注意事项

  1. 无界队列put() 永不阻塞,可能无限增长
  2. 元素必须实现 Delayed:自定义类的基本要求
  3. 空队列 take() 会阻塞:直到有元素到期
  4. poll() 不阻塞:到期返回 null
  5. 不保证精确计时:依赖 getDelay() 的实现

常见错误

错误一:getDelay 返回错误值

java
// ❌ 错误:getDelay 返回固定值
public long getDelay(TimeUnit unit) {
    return 1000;  // 永远返回1秒
}

// ✅ 正确:返回相对当前时间的剩余延迟
public long getDelay(TimeUnit unit) {
    return unit.convert(
        expireTime - System.currentTimeMillis(),
        TimeUnit.MILLISECONDS
    );
}

错误二:compareTo 实现错误

java
// ❌ 错误:忘记实现 compareTo
// DelayQueue 依赖它排序

// ✅ 正确:按到期时间排序
public int compareTo(Delayed o) {
    return Long.compare(this.expireTime, ((DelayedTask) o).expireTime);
}

要点回顾

  • DelayQueue 是支持延迟获取的无界阻塞队列
  • 元素必须实现 Delayed 接口
  • take() 阻塞直到有元素到期
  • poll() 不阻塞,到期返回 null
  • 适合:定时缓存、延迟任务、订单超时处理

相关链接

基于 VitePress 构建