Skip to content

NIO 非阻塞通信

BIO 是这样做的:

java
// 每个连接一个线程
ServerSocket server = new ServerSocket(8080);
while (true) {
    Socket socket = server.accept(); // 阻塞
    new Thread(() -> {
        try {
            // 处理连接
        } catch (Exception e) {
            e.printStackTrace();
        }
    }).start();
}

100 个连接,100 个线程。1000 个连接,1000 个线程。线程是稀缺资源,这条路走不通。

NIO 换了一种思路:不创建线程,用一个 Selector 盯着所有连接,谁有数据谁处理。

完整实现:Echo 服务器

java
public class NIOEchoServer {
    public static void main(String[] args) throws IOException {
        Selector selector = Selector.open();

        // 1. 启动服务端
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.socket().bind(new InetSocketAddress(8080));
        serverChannel.configureBlocking(false);
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);

        System.out.println("NIO Echo 服务器启动,端口 8080");

        // 2. 事件循环
        while (true) {
            int readyChannels = selector.select(); // 阻塞等待
            if (readyChannels == 0) continue;

            Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
            while (iterator.hasNext()) {
                SelectionKey key = iterator.next();
                iterator.remove(); // 必须移除,否则重复处理

                if (key.isAcceptable()) {
                    handleAccept(key);
                } else if (key.isReadable()) {
                    handleRead(key);
                } else if (key.isWritable()) {
                    handleWrite(key);
                }
            }
        }
    }

    private static void handleAccept(SelectionKey key) throws IOException {
        ServerSocketChannel server = (ServerSocketChannel) key.channel();
        SocketChannel client = server.accept();
        client.configureBlocking(false);
        client.register(key.selector(), SelectionKey.OP_READ);
        System.out.println("客户端连接: " + client.getRemoteAddress());
    }

    private static void handleRead(SelectionKey key) throws IOException {
        SocketChannel client = (SocketChannel) key.channel();
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        int read = client.read(buffer);

        if (read > 0) {
            buffer.flip();
            // 切换到写模式,等待可写时再发送
            key.interestOps(SelectionKey.OP_WRITE);
            key.attach(buffer); // 暂存数据
        } else if (read == -1) {
            System.out.println("客户端断开: " + client.getRemoteAddress());
            client.close();
        }
    }

    private static void handleWrite(SelectionKey key) throws IOException {
        SocketChannel client = (SocketChannel) key.channel();
        ByteBuffer buffer = (ByteBuffer) key.attachment();
        if (buffer == null) return;

        buffer.flip();

        // 如果发送缓冲区满了,需要继续等待
        if (buffer.hasRemaining()) {
            client.write(buffer);
        }

        if (!buffer.hasRemaining()) {
            // 发完了,切回读模式
            key.interestOps(SelectionKey.OP_READ);
            key.attach(null);
        }
    }
}

核心逻辑

  1. accept 后注册 OP_READ
  2. 读完数据后切换到 OP_WRITE(等网络可写)
  3. 写完后再切回 OP_READ

Echo 客户端

java
public class NIOEchoClient {
    public static void main(String[] args) throws IOException {
        Selector selector = Selector.open();
        SocketChannel client = SocketChannel.open();
        client.configureBlocking(false);

        // 连接服务器
        client.connect(new InetSocketAddress("localhost", 8080));
        client.register(selector, SelectionKey.OP_CONNECT);

        ByteBuffer buffer = ByteBuffer.allocate(1024);

        while (true) {
            selector.select();
            for (SelectionKey key : selector.selectedKeys()) {
                if (key.isConnectable()) {
                    SocketChannel ch = (SocketChannel) key.channel();
                    if (ch.finishConnect()) {
                        ch.register(selector, SelectionKey.OP_WRITE);
                        System.out.println("已连接到服务器");
                    }
                }
                if (key.isWritable()) {
                    // 发送数据
                    String msg = "Hello NIO!";
                    buffer.put(msg.getBytes());
                    buffer.flip();
                    client.write(buffer);
                    buffer.clear();
                    key.interestOps(SelectionKey.OP_READ);
                }
                if (key.isReadable()) {
                    // 接收回应
                    buffer.clear();
                    int read = client.read(buffer);
                    if (read > 0) {
                        buffer.flip();
                        byte[] data = new byte[buffer.remaining()];
                        buffer.get(data);
                        System.out.println("收到回应: " + new String(data));
                        break;
                    }
                }
            }
            selector.selectedKeys().clear();
        }
    }
}

粘包/拆包:网络编程的拦路虎

TCP 是流协议,不保留消息边界。网络编程必须处理两个经典问题:

  • 粘包:多个小包粘在一起一次到达
  • 拆包:一个大包被拆成多次到达

常见解决方案:定长协议长度+内容协议分隔符协议

这里演示长度+内容协议

java
public class FramingDecoder {
    // 协议格式:[4字节长度][数据内容]
    private ByteBuffer buffer = ByteBuffer.allocate(4096);

    public List&lt;String&gt; decode(SocketChannel channel) throws IOException {
        List&lt;String&gt; messages = new ArrayList&lt;&gt;();
        channel.read(buffer);
        buffer.flip();

        while (buffer.remaining() >= 4) {
            buffer.mark(); // 标记长度字段的位置
            int length = buffer.getInt();

            if (buffer.remaining() >= length) {
                byte[] data = new byte[length];
                buffer.get(data);
                messages.add(new String(data, StandardCharsets.UTF_8));
            } else {
                // 数据不完整,重置到长度字段开头
                buffer.reset();
                break;
            }
        }

        buffer.compact(); // 把未处理完的数据移到开头
        return messages;
    }
}

解码逻辑

  1. 先读 4 字节获取长度
  2. 检查剩余数据是否够一个完整消息
  3. 够就解析,不够就等下次数据

直接用 NIO 写生产级代码是自虐行为。生产环境推荐用 Netty,它帮你处理了:

  • 粘包/拆包
  • 事件循环
  • 线程模型
  • ByteBuf(比 ByteBuffer 更强大)

但理解 NIO 的原理,是理解 Netty 的基础。

下一节,我们来对比一下 BIO 和 NIO 的差异。

基于 VitePress 构建