线程间数据传递
数据怎么从一个线程「交给」另一个线程。
方式总览
| 方式 | 特点 | 适用场景 |
|---|---|---|
| 共享变量 + volatile | 简单 | 单个值传递 |
| BlockingQueue | 安全、易用 | 生产者-消费者 |
| Future | 异步结果 | 一个线程返回结果 |
| CompletableFuture | 链式调用 | 复杂异步流程 |
| ThreadLocal | 线程隔离 | 上下文传递 |
| AtomicReference | 无锁传递 | 单个对象 |
方式一:共享变量 + volatile
最简单的单向传递:
java
public class SharedVariableDemo {
private volatile boolean ready = false;
private volatile int value = 0;
public static void main(String[] args) throws InterruptedException {
SharedVariableDemo demo = new SharedVariableDemo();
Thread writer = new Thread(() -> {
value = 42;
ready = true;
System.out.println("写入完成: value=" + value);
}, "Writer");
Thread reader = new Thread(() -> {
while (!ready) {
Thread.yield();
}
System.out.println("读取到: value=" + value);
}, "Reader");
reader.start();
Thread.sleep(100); // 确保 reader 先启动
writer.start();
}
}注意:volatile 只保证可见性,不保证原子性。
方式二:BlockingQueue
线程安全的消息队列:
java
public class QueueTransfer {
private final BlockingQueue<String> queue = new LinkedBlockingQueue<>();
public static void main(String[] args) throws InterruptedException {
QueueTransfer demo = new QueueTransfer();
demo.start();
}
public void start() throws InterruptedException {
// 发送者
Thread sender = new Thread(() -> {
for (int i = 1; i <= 3; i++) {
try {
queue.put("消息-" + i);
System.out.println("发送: 消息-" + i);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
try {
queue.put("END"); // 结束标记
} catch (InterruptedException e) {}
}, "Sender");
// 接收者
Thread receiver = new Thread(() -> {
while (true) {
try {
String msg = queue.take(); // 阻塞直到有消息
if ("END".equals(msg)) break;
System.out.println("接收: " + msg);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}, "Receiver");
sender.start();
receiver.start();
sender.join();
receiver.join();
}
}方式三:Future
主线程等待子线程返回结果:
java
public class FutureDemo {
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(2);
// 提交任务,获取 Future
Future<Integer> future = executor.submit(() -> {
System.out.println("开始计算...");
Thread.sleep(2000);
return 42;
});
System.out.println("主线程继续干活...");
Integer result = future.get(); // 阻塞等待结果
System.out.println("收到结果: " + result);
executor.shutdown();
}
}Future 常用方法
java
Future<T> future = executor.submit(callable);
// 阻塞获取结果
T result = future.get();
// 阻塞等待,带超时
T result = future.get(5, TimeUnit.SECONDS);
// 检查是否完成
boolean done = future.isDone();
// 取消任务
boolean cancelled = future.cancel(true);
// 检查是否取消
boolean isCancelled = future.isCancelled();方式四:CompletableFuture
支持链式调用,更灵活:
java
public class CompletableFutureDemo {
public static void main(String[] args) throws Exception {
CompletableFuture<String> future = CompletableFuture
.supplyAsync(() -> {
System.out.println("Step 1: 获取用户ID");
return "user123";
})
.thenApply(userId -> {
System.out.println("Step 2: 查询用户信息");
return "UserInfo[" + userId + "]";
})
.thenApply(info -> {
System.out.println("Step 3: 处理信息");
return info.toUpperCase();
})
.exceptionally(ex -> {
System.out.println("出错了: " + ex.getMessage());
return "ERROR";
});
System.out.println("主线程可以干别的...");
String result = future.join(); // 等待完成
System.out.println("最终结果: " + result);
}
}CompletableFuture 常用组合
java
// 组合多个 Future
CompletableFuture.allOf(f1, f2, f3).join();
// 任一个完成
CompletableFuture.anyOf(f1, f2, f3).join();
// 异步回调
future.thenAccept(result -> System.out.println(result));
// 异步执行
CompletableFuture.runAsync(() -> doSomething());方式五:ThreadLocal
每个线程有独立的数据副本:
java
public class ThreadLocalDemo {
private static final ThreadLocal<String> threadLocal =
ThreadLocal.withInitial(() -> "初始值");
public static void main(String[] args) throws InterruptedException {
// 主线程
System.out.println("主线程: " + threadLocal.get());
threadLocal.set("主线程值");
// 子线程
Thread t1 = new Thread(() -> {
System.out.println("线程1: " + threadLocal.get()); // 初始值
threadLocal.set("线程1值");
System.out.println("线程1修改后: " + threadLocal.get());
}, "T1");
Thread t2 = new Thread(() -> {
System.out.println("线程2: " + threadLocal.get()); // 初始值
threadLocal.set("线程2值");
System.out.println("线程2修改后: " + threadLocal.get());
}, "T2");
t1.start();
t2.start();
t1.join();
t2.join();
System.out.println("主线程最终: " + threadLocal.get()); // 主线程值
}
}注意:ThreadLocal 是线程隔离的,主线程改了,子线程不受影响。
方式六:AtomicReference
无锁方式传递对象引用:
java
public class AtomicRefDemo {
private final AtomicReference<String> ref = new AtomicReference<>();
public static void main(String[] args) throws InterruptedException {
AtomicRefDemo demo = new AtomicRefDemo();
Thread writer = new Thread(() -> {
for (int i = 1; i <= 3; i++) {
ref.set("数据-" + i);
System.out.println("写入: 数据-" + i);
}
ref.set("END");
}, "Writer");
Thread reader = new Thread(() -> {
while (true) {
String data = ref.get();
if ("END".equals(data)) break;
if (data != null) {
System.out.println("读取: " + data);
}
}
}, "Reader");
writer.start();
reader.start();
}
}实战:网页爬虫
用 CompletableFuture 模拟:
java
public class CrawlerDemo {
public static void main(String[] args) throws Exception {
ExecutorService executor = Executors.newFixedThreadPool(5);
List<String> urls = List.of(
"https://example.com/1",
"https://example.com/2",
"https://example.com/3"
);
// 并发爬取
List<CompletableFuture<String>> futures = urls.stream()
.map(url -> CompletableFuture.supplyAsync(
() -> crawl(url), executor
))
.collect(Collectors.toList());
// 等待所有结果
CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0])
).join();
// 打印结果
for (int i = 0; i < urls.size(); i++) {
String result = futures.get(i).join();
System.out.println(urls.get(i) + " -> " + result);
}
executor.shutdown();
}
private static String crawl(String url) {
// 模拟爬取
try {
Thread.sleep((long) (Math.random() * 1000));
} catch (InterruptedException e) {}
return "页面内容";
}
}选择建议
| 场景 | 推荐方式 |
|---|---|
| 简单 flag 传递 | volatile |
| 生产者-消费者 | BlockingQueue |
| 获取异步结果 | Future |
| 链式异步操作 | CompletableFuture |
| 线程上下文 | ThreadLocal |
| 无锁传对象 | AtomicReference |
总结
- 线程间传递数据有多种方式,各有适用场景
- 简单场景用 volatile/AtomicReference
- 生产者-消费者场景用 BlockingQueue
- 异步结果用 Future/CompletableFuture
- 根据场景选择合适的工具
