切换主题
四、网络编程
一、Selector
mermaid
graph TD
subgraph selector 版
thread --> selector
selector --> c1(channel)
selector --> c2(channel)
selector --> c3(channel)
end好处
- 一个线程配合 selector 就可以监控多个 channel 的事件,事件发生线程才去处理。避免非阻塞模式下所做无用功
- 让这个线程能够被充分利用
- 节约了线程的数量
- 减少了线程上下文切换
1、创建
java
Selector selector = Selector.open();2、绑定给 Channel 事件
也称之为注册事件,绑定的事件 selector 才会关心
java
channel.configureBlocking(false);
SelectionKey key = channel.register(selector, 绑定事件);- channel 必须工作在非阻塞模式
- FileChannel 没有非阻塞模式,因此不能配合 selector 一起使用
- 绑定的事件类型可以有
- connect - 客户端连接成功时触发
- accept - 服务器端成功接受连接时触发
- read - 数据可读入时触发,有因为接收能力弱,数据暂不能读入的情况
- write - 数据可写出时触发,有因为发送能力弱,数据暂不能写出的情况
3、处理 accept 事件
服务端代码
java
@Slf4j
public class TestSelector {
public static void main(String[] args) throws IOException {
// 1、创建selector,管理多个channel
Selector selector = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open();
//设置为非阻塞模式
ssc.configureBlocking(false); // 必须执行!
// 2、注册channel, key关注accept事件
SelectionKey sscKey = ssc.register(selector, SelectionKey.OP_ACCEPT, null);
log.debug("register key :{}", sscKey);
// 绑定监听端口
ssc.bind(new InetSocketAddress(8888));
while (true) {
//3、阻塞等待,直到有通道就绪,返回就绪通道数
int readyChannelCount = selector.select();
log.debug("本次就绪的Channel数量 = {}", readyChannelCount);
System.out.println(readyChannelCount);
// 4、若有通道就绪,遍历处理所有就绪事件
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isAcceptable()) {
// 查看【accept前】Selector中注册的总Channel数
log.debug("accept前,Selector总Channel数 = {}", selector.keys().size());
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel sc = channel.accept();
sc.configureBlocking(false); // ✅ 核心修复2:SocketChannel必须设为非阻塞
log.debug("成功accept客户端通道:{}", sc);
// ✅ 查看【accept后】Selector中注册的总Channel数(核心需求)
int totalChannelNum = selector.keys().size();
log.debug("accept后,Selector总Channel数 = {}", totalChannelNum);
}
}
}
}
}客户端代码
java
public class TestClient {
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8888));
System.out.println("Connected to server");
}
}4、处理read事件
示例代码
java
if (key.isReadable()) {
SocketChannel channel = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(16);
channel.read(buffer);
buffer.flip();
debugRead(buffer);
}处理消息边界

- 一种思路是固定消息长度,数据包大小一样,服务器按预定长度读取,缺点是浪费带宽
- 另一种思路是按分隔符拆分,缺点是效率低
- TLV 格式,即 Type 类型、Length 长度、Value 数据,类型和长度已知的情况下,就可以方便获取消息大小,分配合适的 buffer,缺点是 buffer 需要提前分配,如果内容过大,则影响 server 吞吐量
- Http 1.1 是 TLV 格式
- Http 2.0 是 LTV 格式
方法一:分隔符拆分
客户端代码
java
public class TestClient {
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress(8888));
sc.write(Charset.defaultCharset().encode("hello\nworld\n111111111111111111111111111111111111111111111111\n"));
System.in.read();
sc.close();
}
}服务端代码
java
import static utils.ByteBufferUtil.debugAll;
public class TestSelector {
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.configureBlocking(false);
serverSocketChannel.bind(new InetSocketAddress(8888));
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey selectionKey = iter.next();
iter.remove();
try {
if (selectionKey.isAcceptable()) {
ServerSocketChannel channel = (ServerSocketChannel) selectionKey.channel();
SocketChannel socketChannel = channel.accept();
socketChannel.configureBlocking(false);
// 将一个bytebuffer关联到selectionkey上
ByteBuffer buffer = ByteBuffer.allocate(16);
socketChannel.register(selector, SelectionKey.OP_READ, buffer);
} else if (selectionKey.isReadable()) {
SocketChannel channel = (SocketChannel) selectionKey.channel();
ByteBuffer buffer = (ByteBuffer) selectionKey.attachment();
int read = channel.read(buffer);
if (read == -1) {
selectionKey.cancel();
channel.close();
return;
} else {
split(buffer);
if (buffer.position() == buffer.limit()) {
ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);
buffer.flip();
newBuffer.put(buffer);
selectionKey.attach(newBuffer);
}
}
}
} catch (IOException e) {
e.printStackTrace();
selectionKey.cancel();
if (selectionKey.channel() != null) {
selectionKey.channel().close();
}
}
}
}
}
private static void split(ByteBuffer buffer) {
buffer.flip();
for (int i = 0; i < buffer.limit(); i++) {
if (buffer.get(i) == '\n') {
int length = i - buffer.position() + 1;
ByteBuffer target = ByteBuffer.allocate(length);
for (int j = 0; j < length; j++) {
target.put(buffer.get());
}
debugAll(target);
}
}
buffer.compact();
}
}5、处理 write 事件
一次无法写完例子
- 非阻塞模式下,无法保证把 buffer 中所有数据都写入 channel,因此需要追踪 write 方法的返回值(代表实际写入字节数)
- 用 selector 监听所有 channel 的可写事件,每个 channel 都需要一个 key 来跟踪 buffer,但这样又会导致占用内存过多,就有两阶段策略
- 当消息处理器第一次写入消息时,才将 channel 注册到 selector 上
- selector 检查 channel 上的可写事件,如果所有的数据写完了,就取消 channel 的注册
- 如果不取消,会每次可写均会触发 write 事件
服务端
java
public class WriteServer {
public static void main(String[] args) throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.bind(new InetSocketAddress(8080));
Selector selector = Selector.open();
ssc.register(selector, SelectionKey.OP_ACCEPT);
while(true) {
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isAcceptable()) {
SocketChannel sc = ssc.accept();
sc.configureBlocking(false);
SelectionKey sckey = sc.register(selector, SelectionKey.OP_READ);
// 1. 向客户端发送内容
StringBuilder sb = new StringBuilder();
for (int i = 0; i < 3000000; i++) {
sb.append("a");
}
ByteBuffer buffer = Charset.defaultCharset().encode(sb.toString());
int write = sc.write(buffer);
// 3. write 表示实际写了多少字节
System.out.println("实际写入字节:" + write);
// 4. 如果有剩余未读字节,才需要关注写事件
if (buffer.hasRemaining()) {
// read 1 write 4
// 在原有关注事件的基础上,多关注 写事件
sckey.interestOps(sckey.interestOps() + SelectionKey.OP_WRITE);
// 把 buffer 作为附件加入 sckey
sckey.attach(buffer);
}
} else if (key.isWritable()) {
ByteBuffer buffer = (ByteBuffer) key.attachment();
SocketChannel sc = (SocketChannel) key.channel();
int write = sc.write(buffer);
System.out.println("实际写入字节:" + write);
if (!buffer.hasRemaining()) { // 写完了
key.interestOps(key.interestOps() - SelectionKey.OP_WRITE);
key.attach(null);
}
}
}
}
}
}客户端
java
public class WriteClient {
public static void main(String[] args) throws IOException {
Selector selector = Selector.open();
SocketChannel sc = SocketChannel.open();
sc.configureBlocking(false);
sc.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
sc.connect(new InetSocketAddress("localhost", 8080));
int count = 0;
while (true) {
selector.select();
Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
if (key.isConnectable()) {
System.out.println(sc.finishConnect());
} else if (key.isReadable()) {
ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
count += sc.read(buffer);
buffer.clear();
System.out.println(count);
}
}
}
}
}二、多线程
没完全搞懂代码,后期再看
java
package org.example.selector;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.*;
import java.util.Iterator;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;
import static utils.ByteBufferUtil.debugAll;
public class TestBossAndWorkers {
public static void main(String[] args) {
// 程序入口,启动boss主线程
}
@Slf4j
static class BossEventLoop implements Runnable {
// boss专属选择器,仅关注accept事件
private Selector boss;
// Worker事件循环数组:存储所有处理读写的子Reactor
private WorkerEventLoop[] workers;
// 防止boss重复启动的开关
private volatile boolean start = false;
// 原子计数器
AtomicInteger index = new AtomicInteger(0);
// 初始化方法,开启服务端通道,绑定端口,初始化selector,启动boss线程
public void register() throws IOException {
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.bind(new InetSocketAddress(8888));
ssc.configureBlocking(false);
// 打开boss专属selector
boss = Selector.open();
SelectionKey ssckey = ssc.register(boss, 0, null);
ssckey.interestOps(SelectionKey.OP_ACCEPT);
// 5. 初始化Worker数组(此处简化为2个,生产环境用CPU核心数)
workers = initEventLoops();
// 6. 启动Boss线程,执行run方法中的事件循环
new Thread(this, "boss").start();
log.debug("boss start... 监听8080端口");
start = true;
}
public WorkerEventLoop[] initEventLoops() {
// WorkerEventLoop[] workerEventLoops = new WorkerEventLoop[Runtime.getRuntime().availableProcessors()];
WorkerEventLoop[] workerEventLoops = new WorkerEventLoop[2];
for (int i = 0; i < workerEventLoops.length; i++) {
workerEventLoops[i] = new WorkerEventLoop(i); // 每个Worker绑定唯一编号
}
return workerEventLoops;
}
@Override
public void run() {
while (true) { // 事件循环永不停止
try {
// 1. 阻塞等待:直到有ACCEPT事件就绪(客户端连接)
boss.select();
// 2. 遍历就绪的事件集合
Iterator<SelectionKey> iter = boss.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove(); // 必须移除,防止Selector重复处理事件
// 3. 仅处理ACCEPT事件(Boss的专属职责)
if (key.isAcceptable()) {
// 强转为服务端通道,接收客户端连接
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel sc = ssc.accept(); // 拿到与客户端的通信通道
sc.configureBlocking(false); // 必须设置非阻塞 → 适配Worker的Selector
log.debug("客户端 {} 已连接", sc.getRemoteAddress());
// 4. 轮询分配连接给Worker:index自增取模 → 负载均衡(worker-0、worker-1轮流接收)
workers[index.getAndIncrement() % workers.length].register(sc);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
/**
* Worker事件循环(从Reactor)→ 多线程运行,每个Worker独占一个Selector
* 核心职责:处理客户端的读事件(OP_READ)、读取客户端发送的数据
*/
@Slf4j
static class WorkerEventLoop implements Runnable {
// worker专属选择器。仅关注read事件
private Selector worker;
// 防止worker重复启动的开关
private volatile boolean start = false;
// worker编号
private int index;
private final ConcurrentLinkedDeque<Runnable> tasks = new ConcurrentLinkedDeque<>();
public WorkerEventLoop(int index) {
this.index = index;
}
/**
* 对外提供的注册方法:将客户端通道(SocketChannel)注册到当前Worker的Selector
* 注意:注册操作必须在Worker自身线程执行 → 核心线程安全约束
*/
public void register(SocketChannel sc) throws IOException {
// 首次注册时,初始化Worker并启动线程
if (!start) {
worker = Selector.open();
new Thread(this, "worker" + index).start();
start = true;
log.debug("worker-{}已启动", index);
}
tasks.add(() -> {
try {
sc.register(worker, SelectionKey.OP_READ, null);
worker.selectNow();
} catch (IOException e) {
e.printStackTrace();
throw new RuntimeException(e);
}
});
worker.wakeup();
}
@Override
public void run() {
while (true) {
try {
// 1、 阻塞等待,知道有读事件,或被wakeup唤醒
worker.select();
// 2、处理任务队列中的任务
Runnable task = tasks.poll();
if (task != null) {
task.run();
}
// 3. 遍历就绪的读事件,处理客户端数据读取
Iterator<SelectionKey> iter = worker.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
iter.remove();
// 仅处理读事件
if (key.isReadable()) {
SocketChannel sc = (SocketChannel) key.channel();
ByteBuffer buffer = ByteBuffer.allocate(128);
try {
int read = sc.read(buffer);
if (read == -1) {
key.cancel();
sc.close();
log.debug("客户端 {} 已断开连接", sc.getRemoteAddress());
} else if (read > 0) {
buffer.flip();
log.debug("worker-{} 接收 {} 数据:", index, sc.getRemoteAddress());
debugAll(buffer); // 打印读取到的字节数据
}
} catch (IOException e) {
e.printStackTrace();
key.cancel();
sc.close();
throw new RuntimeException(e);
}
}
}
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}
}
}
DQ博客