Skip to content

四、网络编程

一、Selector

mermaid
graph TD
subgraph selector 版
thread --> selector
selector --> c1(channel)
selector --> c2(channel)
selector --> c3(channel)
end

好处

  • 一个线程配合 selector 就可以监控多个 channel 的事件,事件发生线程才去处理。避免非阻塞模式下所做无用功
  • 让这个线程能够被充分利用
  • 节约了线程的数量
  • 减少了线程上下文切换

1、创建

java
Selector selector = Selector.open();

2、绑定给 Channel 事件

也称之为注册事件,绑定的事件 selector 才会关心

java
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, 绑定事件);
  • channel 必须工作在非阻塞模式
  • FileChannel 没有非阻塞模式,因此不能配合 selector 一起使用
  • 绑定的事件类型可以有
    • connect - 客户端连接成功时触发
    • accept - 服务器端成功接受连接时触发
    • read - 数据可读入时触发,有因为接收能力弱,数据暂不能读入的情况
    • write - 数据可写出时触发,有因为发送能力弱,数据暂不能写出的情况

3、处理 accept 事件

服务端代码

java
@Slf4j
public class TestSelector {
    public static void main(String[] args) throws IOException {
//        1、创建selector,管理多个channel
        Selector selector = Selector.open();
        ServerSocketChannel ssc = ServerSocketChannel.open();
        //设置为非阻塞模式
        ssc.configureBlocking(false); // 必须执行!

//        2、注册channel, key关注accept事件
        SelectionKey sscKey = ssc.register(selector, SelectionKey.OP_ACCEPT, null);
        log.debug("register key :{}", sscKey);
//        绑定监听端口
        ssc.bind(new InetSocketAddress(8888));

        while (true) {
            //3、阻塞等待,直到有通道就绪,返回就绪通道数
            int readyChannelCount = selector.select();
            log.debug("本次就绪的Channel数量 = {}", readyChannelCount);
            System.out.println(readyChannelCount);

//            4、若有通道就绪,遍历处理所有就绪事件
            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                iter.remove();
                if (key.isAcceptable()) {
                    // 查看【accept前】Selector中注册的总Channel数
                    log.debug("accept前,Selector总Channel数 = {}", selector.keys().size());

                    ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                    SocketChannel sc = channel.accept();
                    sc.configureBlocking(false); // ✅ 核心修复2:SocketChannel必须设为非阻塞
                    log.debug("成功accept客户端通道:{}", sc);

                    // ✅ 查看【accept后】Selector中注册的总Channel数(核心需求)
                    int totalChannelNum = selector.keys().size();
                    log.debug("accept后,Selector总Channel数 = {}", totalChannelNum);
                } 
            }
        }
    }
}

客户端代码

java
public class TestClient {
    public static void main(String[] args) throws IOException {
        SocketChannel sc = SocketChannel.open();
        sc.connect(new InetSocketAddress("localhost", 8888));
        System.out.println("Connected to server");
    }
}

4、处理read事件

示例代码

java
if (key.isReadable()) {
     SocketChannel channel = (SocketChannel) key.channel();
     ByteBuffer buffer = ByteBuffer.allocate(16);
     channel.read(buffer);
     buffer.flip();
     debugRead(buffer);
 }

处理消息边界

  • 一种思路是固定消息长度,数据包大小一样,服务器按预定长度读取,缺点是浪费带宽
  • 另一种思路是按分隔符拆分,缺点是效率低
  • TLV 格式,即 Type 类型、Length 长度、Value 数据,类型和长度已知的情况下,就可以方便获取消息大小,分配合适的 buffer,缺点是 buffer 需要提前分配,如果内容过大,则影响 server 吞吐量
    • Http 1.1 是 TLV 格式
    • Http 2.0 是 LTV 格式

方法一:分隔符拆分

客户端代码

java
public class TestClient {
    public static void main(String[] args) throws IOException {
        SocketChannel sc = SocketChannel.open();
        sc.connect(new InetSocketAddress(8888));
        sc.write(Charset.defaultCharset().encode("hello\nworld\n111111111111111111111111111111111111111111111111\n"));
        System.in.read();
        sc.close();
    }
}

服务端代码

java
import static utils.ByteBufferUtil.debugAll;

public class TestSelector {
    public static void main(String[] args) throws IOException {
        Selector selector = Selector.open();
        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
        serverSocketChannel.configureBlocking(false);
        serverSocketChannel.bind(new InetSocketAddress(8888));

        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        while (true) {
            selector.select();

            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            while (iter.hasNext()) {
                SelectionKey selectionKey = iter.next();
                iter.remove();

                try {
                    if (selectionKey.isAcceptable()) {
                        ServerSocketChannel channel = (ServerSocketChannel) selectionKey.channel();
                        SocketChannel socketChannel = channel.accept();
                        socketChannel.configureBlocking(false);
                        //                    将一个bytebuffer关联到selectionkey上
                        ByteBuffer buffer = ByteBuffer.allocate(16);
                        socketChannel.register(selector, SelectionKey.OP_READ, buffer);
                    } else if (selectionKey.isReadable()) {
                        SocketChannel channel = (SocketChannel) selectionKey.channel();
                        ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
                        int read = channel.read(buffer);
                        if (read == -1) {
                            selectionKey.cancel();
                            channel.close();
                            return;
                        } else {
                            split(buffer);
                            if (buffer.position() == buffer.limit()) {
                                ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);
                                buffer.flip();
                                newBuffer.put(buffer);
                                selectionKey.attach(newBuffer);
                            }
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                    selectionKey.cancel();
                    if (selectionKey.channel() != null) {
                        selectionKey.channel().close();
                    }
                }
            }
        }
    }

    private static void split(ByteBuffer buffer) {
        buffer.flip();
        for (int i = 0; i < buffer.limit(); i++) {
            if (buffer.get(i) == '\n') {
                int length = i - buffer.position() + 1;
                ByteBuffer target = ByteBuffer.allocate(length);
                for (int j = 0; j < length; j++) {
                    target.put(buffer.get());
                }
                debugAll(target);
            }
        }
        buffer.compact();
    }
}

5、处理 write 事件

一次无法写完例子

  • 非阻塞模式下,无法保证把 buffer 中所有数据都写入 channel,因此需要追踪 write 方法的返回值(代表实际写入字节数)
  • 用 selector 监听所有 channel 的可写事件,每个 channel 都需要一个 key 来跟踪 buffer,但这样又会导致占用内存过多,就有两阶段策略
    • 当消息处理器第一次写入消息时,才将 channel 注册到 selector 上
    • selector 检查 channel 上的可写事件,如果所有的数据写完了,就取消 channel 的注册
    • 如果不取消,会每次可写均会触发 write 事件

服务端

java
public class WriteServer {

    public static void main(String[] args) throws IOException {
        ServerSocketChannel ssc = ServerSocketChannel.open();
        ssc.configureBlocking(false);
        ssc.bind(new InetSocketAddress(8080));

        Selector selector = Selector.open();
        ssc.register(selector, SelectionKey.OP_ACCEPT);

        while(true) {
            selector.select();

            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                iter.remove();
                if (key.isAcceptable()) {
                    SocketChannel sc = ssc.accept();
                    sc.configureBlocking(false);
                    SelectionKey sckey = sc.register(selector, SelectionKey.OP_READ);
                    // 1. 向客户端发送内容
                    StringBuilder sb = new StringBuilder();
                    for (int i = 0; i < 3000000; i++) {
                        sb.append("a");
                    }
                    ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
                    int write = sc.write(buffer);
                    // 3. write 表示实际写了多少字节
                    System.out.println("实际写入字节:" + write);
                    // 4. 如果有剩余未读字节,才需要关注写事件
                    if (buffer.hasRemaining()) {
                        // read 1  write 4
                        // 在原有关注事件的基础上,多关注 写事件
                        sckey.interestOps(sckey.interestOps() + SelectionKey.OP_WRITE);
                        // 把 buffer 作为附件加入 sckey
                        sckey.attach(buffer);
                    }
                } else if (key.isWritable()) {
                    ByteBuffer buffer = (ByteBuffer) key.attachment();
                    SocketChannel sc = (SocketChannel) key.channel();
                    int write = sc.write(buffer);
                    System.out.println("实际写入字节:" + write);
                    if (!buffer.hasRemaining()) { // 写完了
                        key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
                        key.attach(null);
                    }
                }
            }
        }
    }
}

客户端

java
public class WriteClient {
    public static void main(String[] args) throws IOException {
        Selector selector = Selector.open();
        SocketChannel sc = SocketChannel.open();
        sc.configureBlocking(false);
        sc.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
        sc.connect(new InetSocketAddress("localhost", 8080));
        int count = 0;
        while (true) {
            selector.select();
            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                iter.remove();
                if (key.isConnectable()) {
                    System.out.println(sc.finishConnect());
                } else if (key.isReadable()) {
                    ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
                    count += sc.read(buffer);
                    buffer.clear();
                    System.out.println(count);
                }
            }
        }
    }
}

二、多线程

没完全搞懂代码,后期再看

java
package org.example.selector;

import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;

import static utils.ByteBufferUtil.debugAll;

public class TestBossAndWorkers {
    public static void main(String[] args) {
//        程序入口,启动boss主线程

    }

    @Slf4j
    static class BossEventLoop implements Runnable {
        //        boss专属选择器,仅关注accept事件
        private Selector boss;
        // Worker事件循环数组:存储所有处理读写的子Reactor
        private WorkerEventLoop[] workers;
        //        防止boss重复启动的开关
        private volatile boolean start = false;
        //        原子计数器
        AtomicInteger index = new AtomicInteger(0);

//        初始化方法,开启服务端通道,绑定端口,初始化selector,启动boss线程
        public void register() throws IOException {
            ServerSocketChannel ssc = ServerSocketChannel.open();
            ssc.bind(new InetSocketAddress(8888));
            ssc.configureBlocking(false);

//            打开boss专属selector
            boss = Selector.open();
            SelectionKey ssckey = ssc.register(boss, 0, null);
            ssckey.interestOps(SelectionKey.OP_ACCEPT);
            // 5. 初始化Worker数组(此处简化为2个,生产环境用CPU核心数)
            workers = initEventLoops();
            // 6. 启动Boss线程,执行run方法中的事件循环
            new Thread(this, "boss").start();
            log.debug("boss start... 监听8080端口");
            start = true;
        }

        public WorkerEventLoop[] initEventLoops() {
//        WorkerEventLoop[] workerEventLoops = new WorkerEventLoop[Runtime.getRuntime().availableProcessors()];
            WorkerEventLoop[] workerEventLoops = new WorkerEventLoop[2];
            for (int i = 0; i < workerEventLoops.length; i++) {
                workerEventLoops[i] = new WorkerEventLoop(i); // 每个Worker绑定唯一编号
            }
            return workerEventLoops;
        }

        @Override
        public void run() {
            while (true) { // 事件循环永不停止
                try {
                    // 1. 阻塞等待:直到有ACCEPT事件就绪(客户端连接)
                    boss.select();
                    // 2. 遍历就绪的事件集合
                    Iterator<SelectionKey> iter = boss.selectedKeys().iterator();
                    while (iter.hasNext()) {
                        SelectionKey key = iter.next();
                        iter.remove(); // 必须移除,防止Selector重复处理事件
                        // 3. 仅处理ACCEPT事件(Boss的专属职责)
                        if (key.isAcceptable()) {
                            // 强转为服务端通道,接收客户端连接
                            ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
                            SocketChannel sc = ssc.accept(); // 拿到与客户端的通信通道
                            sc.configureBlocking(false); // 必须设置非阻塞 → 适配Worker的Selector
                            log.debug("客户端 {} 已连接", sc.getRemoteAddress());
                            // 4. 轮询分配连接给Worker:index自增取模 → 负载均衡(worker-0、worker-1轮流接收)
                            workers[index.getAndIncrement() % workers.length].register(sc);
                        }
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

    /**
     * Worker事件循环(从Reactor)→ 多线程运行,每个Worker独占一个Selector
     * 核心职责:处理客户端的读事件(OP_READ)、读取客户端发送的数据
     */
    @Slf4j
    static class WorkerEventLoop implements Runnable {
        //        worker专属选择器。仅关注read事件
        private Selector worker;
        //        防止worker重复启动的开关
        private volatile boolean start = false;
        //        worker编号
        private int index;

        private final ConcurrentLinkedDeque<Runnable> tasks = new ConcurrentLinkedDeque<>();

        public WorkerEventLoop(int index) {
            this.index = index;
        }

        /**
         * 对外提供的注册方法:将客户端通道(SocketChannel)注册到当前Worker的Selector
         * 注意:注册操作必须在Worker自身线程执行 → 核心线程安全约束
         */
        public void register(SocketChannel sc) throws IOException {
            // 首次注册时,初始化Worker并启动线程
            if (!start) {
                worker = Selector.open();
                new Thread(this, "worker" + index).start();
                start = true;
                log.debug("worker-{}已启动", index);
            }
            tasks.add(() -> {
                try {
                    sc.register(worker, SelectionKey.OP_READ, null);
                    worker.selectNow();
                } catch (IOException e) {
                    e.printStackTrace();
                    throw new RuntimeException(e);
                }
            });
            worker.wakeup();
        }

        @Override
        public void run() {
            while (true) {
                try {
//                   1、 阻塞等待,知道有读事件,或被wakeup唤醒
                    worker.select();
//                    2、处理任务队列中的任务
                    Runnable task = tasks.poll();
                    if (task != null) {
                        task.run();
                    }
//                    3. 遍历就绪的读事件,处理客户端数据读取
                    Iterator<SelectionKey> iter = worker.selectedKeys().iterator();
                    while (iter.hasNext()) {
                        SelectionKey key = iter.next();
                        iter.remove();
//                        仅处理读事件
                        if (key.isReadable()) {
                            SocketChannel sc = (SocketChannel) key.channel();
                            ByteBuffer buffer = ByteBuffer.allocate(128);
                            try {
                                int read = sc.read(buffer);
                                if (read == -1) {
                                    key.cancel();
                                    sc.close();
                                    log.debug("客户端 {} 已断开连接", sc.getRemoteAddress());
                                } else if (read > 0) {
                                    buffer.flip();
                                    log.debug("worker-{} 接收 {} 数据:", index, sc.getRemoteAddress());
                                    debugAll(buffer); // 打印读取到的字节数据
                                }
                            } catch (IOException e) {
                                e.printStackTrace();
                                key.cancel();
                                sc.close();
                                throw new RuntimeException(e);
                            }
                        }
                    }
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }
}