Exchanger
有些场景下,两个线程需要交换数据。
比如:生产者给消费者一个产品,消费者给生产者一个空碗。
Exchanger 就是干这个的。
核心思想
Exchanger(交换器)允许两个线程交换数据:
线程A: 数据A ──► exchange() ──► 获得数据B
线程B: 数据B ──► exchange() ──► 获得数据A两个线程都调用 exchange() 后,数据交换才发生。
代码演示
基础用法
java
public class ExchangerDemo {
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
Thread threadA = new Thread(() -> {
try {
String dataA = "A的数据";
System.out.println("线程A 准备好数据: " + dataA);
Thread.sleep(1000); // 模拟处理时间
String received = exchanger.exchange(dataA);
System.out.println("线程A 收到数据: " + received);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "ThreadA");
Thread threadB = new Thread(() -> {
try {
String dataB = "B的数据";
System.out.println("线程B 准备好数据: " + dataB);
Thread.sleep(500); // 模拟处理时间
String received = exchanger.exchange(dataB);
System.out.println("线程B 收到数据: " + received);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}, "ThreadB");
threadA.start();
threadB.start();
}
}带超时的交换
java
public class ExchangerWithTimeout {
public static void main(String[] args) {
Exchanger<String> exchanger = new Exchanger<>();
String data = "超时测试数据";
try {
System.out.println("开始交换,等待 3 秒...");
String received = exchanger.exchange(data, 3, TimeUnit.SECONDS);
System.out.println("收到数据: " + received);
} catch (TimeoutException e) {
System.out.println("交换超时!对方线程可能不存在");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}实际应用:双缓存数据交换
java
public class DoubleBufferExchanger {
private static final Exchanger<Buffer> exchanger = new Exchanger<>();
private static Buffer writeBuffer = new Buffer("初始写入数据");
private static Buffer readBuffer = null;
public static void main(String[] args) throws InterruptedException {
// 写入线程
Thread writer = new Thread(() -> {
int count = 0;
while (count < 3) {
try {
count++;
writeBuffer = new Buffer("数据批次 " + count);
System.out.println("[写入] 准备写入: " + writeBuffer);
System.out.println("[写入] 交换缓冲区...");
writeBuffer = exchanger.exchange(writeBuffer);
System.out.println("[写入] 交换完成,获取到: " + writeBuffer);
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}, "Writer");
// 读取线程
Thread reader = new Thread(() -> {
int count = 0;
while (count < 3) {
try {
Thread.sleep(300);
System.out.println("[读取] 交换缓冲区...");
readBuffer = exchanger.exchange(readBuffer);
System.out.println("[读取] 获取数据: " + readBuffer);
count++;
Thread.sleep(200);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
}, "Reader");
writer.start();
reader.start();
writer.join();
reader.join();
System.out.println("双缓存交换演示完成");
}
static class Buffer {
private final String data;
public Buffer(String data) {
this.data = data;
}
@Override
public String toString() {
return "Buffer{" + data + "}";
}
}
}实际应用:生产者-消费者
java
public class ProducerConsumerExchanger {
private static final Exchanger<馒头> exchanger = new Exchanger<>();
private static final Random random = new Random();
public static void main(String[] args) {
Thread producer = new Thread(new Producer(), "Producer");
Thread consumer = new Thread(new Consumer(), "Consumer");
producer.start();
consumer.start();
}
static class Producer implements Runnable {
private int count = 0;
@Override
public void run() {
while (count < 5) {
try {
count++;
馒头 bun = new 馒头(count);
System.out.println("[生产者] 制造馒头 #" + bun.id);
Thread.sleep(random.nextInt(500) + 200);
// 交换:给出馒头,换回空碗(或上一个馒头)
馒头 exchanged = exchanger.exchange(bun);
System.out.println("[生产者] 换回: " + exchanged);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
System.out.println("[生产者] 生产完成");
}
}
static class Consumer implements Runnable {
private int eaten = 0;
@Override
public void run() {
while (eaten < 5) {
try {
Thread.sleep(random.nextInt(300) + 100);
// 交换:给出空碗,换回馒头
馒头 bun = exchanger.exchange(new 馒头(0));
if (bun.id > 0) {
eaten++;
System.out.println("[消费者] 吃掉了馒头 #" + bun.id + " (累计: " + eaten + ")");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
break;
}
}
System.out.println("[消费者] 吃完了");
}
}
static class 馒头 {
int id;
public 馒头(int id) {
this.id = id;
}
@Override
public String toString() {
return id > 0 ? "馒头#" + id : "空碗";
}
}
}方法一览
| 方法 | 说明 |
|---|---|
Exchanger<V>() | 创建交换器 |
exchange(V v) | 阻塞等待配对线程,交换数据 |
exchange(V v, long timeout, TimeUnit) | 等待指定时间,超时抛出 TimeoutException |
注意事项
- 需要配对线程:Exchanger 需要两个线程同时调用
exchange()才能交换数据。如果只有一个线程调用,会一直等待 - 死锁风险:如果一个线程交换后不再继续,可能导致另一个线程永远等待
- 超时处理:使用带超时的
exchange()方法可以避免无限等待 - 单次使用:每次
exchange()调用后,需要重新调用才能再次交换
java
// 使用超时避免死锁
try {
String result = exchanger.exchange(data, 5, TimeUnit.SECONDS);
} catch (TimeoutException e) {
System.out.println("对方线程未到达,取消交换");
// 处理超时情况
}要点回顾
Exchanger 适用于两个线程间需要直接交换数据的场景:
- 生产者-消费者模式中的数据传递
- 双缓存交换
- 遗传算法中的基因交换
记住:成对使用,必须两个线程同时调用。
