Skip to content

Exchanger

有些场景下,两个线程需要交换数据。

比如:生产者给消费者一个产品,消费者给生产者一个空碗。

Exchanger 就是干这个的。

核心思想

Exchanger(交换器)允许两个线程交换数据:

线程A:    数据A ──► exchange() ──► 获得数据B
线程B:    数据B ──► exchange() ──► 获得数据A

两个线程都调用 exchange() 后,数据交换才发生。

代码演示

基础用法

java
public class ExchangerDemo {

    public static void main(String[] args) {
        Exchanger<String> exchanger = new Exchanger<>();

        Thread threadA = new Thread(() -> {
            try {
                String dataA = "A的数据";
                System.out.println("线程A 准备好数据: " + dataA);
                Thread.sleep(1000);  // 模拟处理时间
                String received = exchanger.exchange(dataA);
                System.out.println("线程A 收到数据: " + received);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, "ThreadA");

        Thread threadB = new Thread(() -> {
            try {
                String dataB = "B的数据";
                System.out.println("线程B 准备好数据: " + dataB);
                Thread.sleep(500);  // 模拟处理时间
                String received = exchanger.exchange(dataB);
                System.out.println("线程B 收到数据: " + received);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }, "ThreadB");

        threadA.start();
        threadB.start();
    }
}

带超时的交换

java
public class ExchangerWithTimeout {

    public static void main(String[] args) {
        Exchanger<String> exchanger = new Exchanger<>();
        String data = "超时测试数据";

        try {
            System.out.println("开始交换,等待 3 秒...");
            String received = exchanger.exchange(data, 3, TimeUnit.SECONDS);
            System.out.println("收到数据: " + received);
        } catch (TimeoutException e) {
            System.out.println("交换超时!对方线程可能不存在");
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

实际应用:双缓存数据交换

java
public class DoubleBufferExchanger {

    private static final Exchanger<Buffer> exchanger = new Exchanger<>();
    private static Buffer writeBuffer = new Buffer("初始写入数据");
    private static Buffer readBuffer = null;

    public static void main(String[] args) throws InterruptedException {
        // 写入线程
        Thread writer = new Thread(() -> {
            int count = 0;
            while (count < 3) {
                try {
                    count++;
                    writeBuffer = new Buffer("数据批次 " + count);
                    System.out.println("[写入] 准备写入: " + writeBuffer);
                    System.out.println("[写入] 交换缓冲区...");
                    writeBuffer = exchanger.exchange(writeBuffer);
                    System.out.println("[写入] 交换完成,获取到: " + writeBuffer);
                    Thread.sleep(500);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }, "Writer");

        // 读取线程
        Thread reader = new Thread(() -> {
            int count = 0;
            while (count < 3) {
                try {
                    Thread.sleep(300);
                    System.out.println("[读取] 交换缓冲区...");
                    readBuffer = exchanger.exchange(readBuffer);
                    System.out.println("[读取] 获取数据: " + readBuffer);
                    count++;
                    Thread.sleep(200);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
        }, "Reader");

        writer.start();
        reader.start();
        writer.join();
        reader.join();
        System.out.println("双缓存交换演示完成");
    }

    static class Buffer {
        private final String data;

        public Buffer(String data) {
            this.data = data;
        }

        @Override
        public String toString() {
            return "Buffer{" + data + "}";
        }
    }
}

实际应用:生产者-消费者

java
public class ProducerConsumerExchanger {

    private static final Exchanger<馒头> exchanger = new Exchanger<>();
    private static final Random random = new Random();

    public static void main(String[] args) {
        Thread producer = new Thread(new Producer(), "Producer");
        Thread consumer = new Thread(new Consumer(), "Consumer");

        producer.start();
        consumer.start();
    }

    static class Producer implements Runnable {
        private int count = 0;

        @Override
        public void run() {
            while (count < 5) {
                try {
                    count++;
                    馒头 bun = new 馒头(count);
                    System.out.println("[生产者] 制造馒头 #" + bun.id);

                    Thread.sleep(random.nextInt(500) + 200);

                    // 交换:给出馒头,换回空碗(或上一个馒头)
                    馒头 exchanged = exchanger.exchange(bun);
                    System.out.println("[生产者] 换回: " + exchanged);

                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
            System.out.println("[生产者] 生产完成");
        }
    }

    static class Consumer implements Runnable {
        private int eaten = 0;

        @Override
        public void run() {
            while (eaten < 5) {
                try {
                    Thread.sleep(random.nextInt(300) + 100);

                    // 交换:给出空碗,换回馒头
                    馒头 bun = exchanger.exchange(new 馒头(0));
                    if (bun.id > 0) {
                        eaten++;
                        System.out.println("[消费者] 吃掉了馒头 #" + bun.id + " (累计: " + eaten + ")");
                    }

                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    break;
                }
            }
            System.out.println("[消费者] 吃完了");
        }
    }

    static class 馒头 {
        int id;

        public 馒头(int id) {
            this.id = id;
        }

        @Override
        public String toString() {
            return id > 0 ? "馒头#" + id : "空碗";
        }
    }
}

方法一览

方法说明
Exchanger<V>()创建交换器
exchange(V v)阻塞等待配对线程,交换数据
exchange(V v, long timeout, TimeUnit)等待指定时间,超时抛出 TimeoutException

注意事项

  1. 需要配对线程:Exchanger 需要两个线程同时调用 exchange() 才能交换数据。如果只有一个线程调用,会一直等待
  2. 死锁风险:如果一个线程交换后不再继续,可能导致另一个线程永远等待
  3. 超时处理:使用带超时的 exchange() 方法可以避免无限等待
  4. 单次使用:每次 exchange() 调用后,需要重新调用才能再次交换
java
// 使用超时避免死锁
try {
    String result = exchanger.exchange(data, 5, TimeUnit.SECONDS);
} catch (TimeoutException e) {
    System.out.println("对方线程未到达,取消交换");
    // 处理超时情况
}

要点回顾

Exchanger 适用于两个线程间需要直接交换数据的场景:

  • 生产者-消费者模式中的数据传递
  • 双缓存交换
  • 遗传算法中的基因交换

记住:成对使用,必须两个线程同时调用。

基于 VitePress 构建