Skip to content

线程间数据传递

数据怎么从一个线程「交给」另一个线程。

方式总览

方式特点适用场景
共享变量 + volatile简单单个值传递
BlockingQueue安全、易用生产者-消费者
Future异步结果一个线程返回结果
CompletableFuture链式调用复杂异步流程
ThreadLocal线程隔离上下文传递
AtomicReference无锁传递单个对象

方式一:共享变量 + volatile

最简单的单向传递:

java
public class SharedVariableDemo {

    private volatile boolean ready = false;
    private volatile int value = 0;

    public static void main(String[] args) throws InterruptedException {
        SharedVariableDemo demo = new SharedVariableDemo();

        Thread writer = new Thread(() -> {
            value = 42;
            ready = true;
            System.out.println("写入完成: value=" + value);
        }, "Writer");

        Thread reader = new Thread(() -> {
            while (!ready) {
                Thread.yield();
            }
            System.out.println("读取到: value=" + value);
        }, "Reader");

        reader.start();
        Thread.sleep(100);  // 确保 reader 先启动
        writer.start();
    }
}

注意:volatile 只保证可见性,不保证原子性。

方式二:BlockingQueue

线程安全的消息队列:

java
public class QueueTransfer {

    private final BlockingQueue<String> queue = new LinkedBlockingQueue<>();

    public static void main(String[] args) throws InterruptedException {
        QueueTransfer demo = new QueueTransfer();
        demo.start();
    }

    public void start() throws InterruptedException {
        // 发送者
        Thread sender = new Thread(() -> {
            for (int i = 1; i <= 3; i++) {
                try {
                    queue.put("消息-" + i);
                    System.out.println("发送: 消息-" + i);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
            try {
                queue.put("END");  // 结束标记
            } catch (InterruptedException e) {}
        }, "Sender");

        // 接收者
        Thread receiver = new Thread(() -> {
            while (true) {
                try {
                    String msg = queue.take();  // 阻塞直到有消息
                    if ("END".equals(msg)) break;
                    System.out.println("接收: " + msg);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        }, "Receiver");

        sender.start();
        receiver.start();
        sender.join();
        receiver.join();
    }
}

方式三:Future

主线程等待子线程返回结果:

java
public class FutureDemo {

    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newFixedThreadPool(2);

        // 提交任务,获取 Future
        Future<Integer> future = executor.submit(() -> {
            System.out.println("开始计算...");
            Thread.sleep(2000);
            return 42;
        });

        System.out.println("主线程继续干活...");
        Integer result = future.get();  // 阻塞等待结果
        System.out.println("收到结果: " + result);

        executor.shutdown();
    }
}

Future 常用方法

java
Future<T> future = executor.submit(callable);

// 阻塞获取结果
T result = future.get();

// 阻塞等待,带超时
T result = future.get(5, TimeUnit.SECONDS);

// 检查是否完成
boolean done = future.isDone();

// 取消任务
boolean cancelled = future.cancel(true);

// 检查是否取消
boolean isCancelled = future.isCancelled();

方式四:CompletableFuture

支持链式调用,更灵活:

java
public class CompletableFutureDemo {

    public static void main(String[] args) throws Exception {
        CompletableFuture<String> future = CompletableFuture
            .supplyAsync(() -> {
                System.out.println("Step 1: 获取用户ID");
                return "user123";
            })
            .thenApply(userId -> {
                System.out.println("Step 2: 查询用户信息");
                return "UserInfo[" + userId + "]";
            })
            .thenApply(info -> {
                System.out.println("Step 3: 处理信息");
                return info.toUpperCase();
            })
            .exceptionally(ex -> {
                System.out.println("出错了: " + ex.getMessage());
                return "ERROR";
            });

        System.out.println("主线程可以干别的...");
        String result = future.join();  // 等待完成
        System.out.println("最终结果: " + result);
    }
}

CompletableFuture 常用组合

java
// 组合多个 Future
CompletableFuture.allOf(f1, f2, f3).join();

// 任一个完成
CompletableFuture.anyOf(f1, f2, f3).join();

// 异步回调
future.thenAccept(result -> System.out.println(result));

// 异步执行
CompletableFuture.runAsync(() -> doSomething());

方式五:ThreadLocal

每个线程有独立的数据副本:

java
public class ThreadLocalDemo {

    private static final ThreadLocal<String> threadLocal =
        ThreadLocal.withInitial(() -> "初始值");

    public static void main(String[] args) throws InterruptedException {
        // 主线程
        System.out.println("主线程: " + threadLocal.get());
        threadLocal.set("主线程值");

        // 子线程
        Thread t1 = new Thread(() -> {
            System.out.println("线程1: " + threadLocal.get());  // 初始值
            threadLocal.set("线程1值");
            System.out.println("线程1修改后: " + threadLocal.get());
        }, "T1");

        Thread t2 = new Thread(() -> {
            System.out.println("线程2: " + threadLocal.get());  // 初始值
            threadLocal.set("线程2值");
            System.out.println("线程2修改后: " + threadLocal.get());
        }, "T2");

        t1.start();
        t2.start();
        t1.join();
        t2.join();

        System.out.println("主线程最终: " + threadLocal.get());  // 主线程值
    }
}

注意:ThreadLocal 是线程隔离的,主线程改了,子线程不受影响。

方式六:AtomicReference

无锁方式传递对象引用:

java
public class AtomicRefDemo {

    private final AtomicReference<String> ref = new AtomicReference<>();

    public static void main(String[] args) throws InterruptedException {
        AtomicRefDemo demo = new AtomicRefDemo();

        Thread writer = new Thread(() -> {
            for (int i = 1; i <= 3; i++) {
                ref.set("数据-" + i);
                System.out.println("写入: 数据-" + i);
            }
            ref.set("END");
        }, "Writer");

        Thread reader = new Thread(() -> {
            while (true) {
                String data = ref.get();
                if ("END".equals(data)) break;
                if (data != null) {
                    System.out.println("读取: " + data);
                }
            }
        }, "Reader");

        writer.start();
        reader.start();
    }
}

实战:网页爬虫

用 CompletableFuture 模拟:

java
public class CrawlerDemo {

    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newFixedThreadPool(5);

        List<String> urls = List.of(
            "https://example.com/1",
            "https://example.com/2",
            "https://example.com/3"
        );

        // 并发爬取
        List<CompletableFuture<String>> futures = urls.stream()
            .map(url -> CompletableFuture.supplyAsync(
                () -> crawl(url), executor
            ))
            .collect(Collectors.toList());

        // 等待所有结果
        CompletableFuture.allOf(
            futures.toArray(new CompletableFuture[0])
        ).join();

        // 打印结果
        for (int i = 0; i < urls.size(); i++) {
            String result = futures.get(i).join();
            System.out.println(urls.get(i) + " -> " + result);
        }

        executor.shutdown();
    }

    private static String crawl(String url) {
        // 模拟爬取
        try {
            Thread.sleep((long) (Math.random() * 1000));
        } catch (InterruptedException e) {}
        return "页面内容";
    }
}

选择建议

场景推荐方式
简单 flag 传递volatile
生产者-消费者BlockingQueue
获取异步结果Future
链式异步操作CompletableFuture
线程上下文ThreadLocal
无锁传对象AtomicReference

总结

  • 线程间传递数据有多种方式,各有适用场景
  • 简单场景用 volatile/AtomicReference
  • 生产者-消费者场景用 BlockingQueue
  • 异步结果用 Future/CompletableFuture
  • 根据场景选择合适的工具

基于 VitePress 构建