[TOC]
一、消息边界问题的产生
1.1 服务端代码
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
/**
* Created by lilinchao
* Date 2022/6/3
* Description 消息边界问题 服务端
*/
@Slf4j
public class Server {
public static void main(String[] args) throws IOException {
//1.创建selector,管理多个channel
Selector selector = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
//2. 建立channel和selector之间的联系(注册)
SelectionKey sscKey = ssc.register(selector, 0, null);
sscKey.interestOps(SelectionKey.OP_ACCEPT);
ssc.bind(new InetSocketAddress(8080));
while (true){
//3. selector.select()方法,没有事件就阻塞,有了事件发送了就恢复运行继续向下处理
selector.select();
//4. 处理事件,selectionKeys拿到所有发生的可读可写的事件
Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
//多个key的时候,accept和read方法都会触发事件,所以要区分事件类型
while (iterator.hasNext()){
SelectionKey key = iterator.next();
//处理key的时候要从selectKeys中删除,否则会报错
iterator.remove();
//5.区分事件类型
if(key.isAcceptable()){
//拿到触发事件的channel
ServerSocketChannel channel = (ServerSocketChannel)key.channel();
SocketChannel sc = channel.accept();
//设置为非阻塞
sc.configureBlocking(false);
//scKey管sc的channel
SelectionKey scKey = sc.register(selector, 0, null);
//scKey关注读事件,也就是说客户端的通道关注可读事件
scKey.interestOps(SelectionKey.OP_READ);
}else if(key.isReadable()){
//客户端关闭之后也会引发read事件,这时需要从key中remove掉,否则拿不到channel,报错
try {
SocketChannel channel = (SocketChannel)key.channel();
//将缓冲区大小设置为4
ByteBuffer buffer1 = ByteBuffer.allocate(4);
//客户端正常断开,read返回值是-1
int read = channel.read(buffer1);
if(read == -1){
//正常断开
key.channel();
}
buffer1.flip();
System.out.println(Charset.defaultCharset().decode(buffer1));
} catch (IOException e) {
e.printStackTrace();
key.cancel();//客户端断开,需要将key取消(从selector的key集合中真正删除)
}
}
}
}
}
}
1.2 客户端代码
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.nio.channels.SocketChannel;
/**
* Created by lilinchao
* Date 2022/6/3
* Description 客户端
*/
public class Client {
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8080));
SocketAddress localAddress = sc.getLocalAddress();
System.out.println("waiting...");
}
}
运行程序
(1)运行服务端代码
(2)通过Debug模式运行客户端代码
(3)通过客户端向服务端发送如下请求
sc.write(Charset.defaultCharset().encode("中国"));
服务端输出结果
从输出结果可以看到, 国 字出现了乱码。
问题分析
因为在服务端代码中设置的接收客户端数据的缓冲区大小是4个字节,在UTF-8编码中,一个汉字占三个字节,也就是服务端在接收客户端发送到的消息时,只接收到了中字的三个字节和国字的第一个字节就进行了打印输出,导致国字出现了半包问题,产生了乱码。
二、消息边界问题分析
分析
- 时刻1 :ByteBufeer较小,但是发送过来的消息比较大,一次处理不完;
- 时刻2 :ByteBufeer较大,消息比较小。会出现半包现象
- 时刻3 :ButeBuffer可以一次性接收客户端发送过来的多条消息。此时会出现黏包现象
解决思路
(1) 固定消息长度 ,数据包大小一样,服务器按预定长度读取,当发送的数据较少时,需要将数据进行填充,直到长度与消息规定长度一致。缺点是浪费带宽
(2) 按分隔符拆分 ,缺点是效率低,需要一个一个字符去匹配分隔符
(3) TLV 格式,即 Type 类型、Length 长度、Value 数据 (也就是在消息开头 用一些空间存放后面数据的长度 ),如HTTP请求头中的Content-Type与 Content-Length 。类型和长度已知的情况下,就可以方便获取消息大小,分配合适的 buffer,缺点是 buffer 需要提前分配,如果内容过大,则影响 server 吞吐量
三、解决消息边界问题
本示例将按照第二种方式, 按分隔符拆分 来解决消息边界问题。
3.1 附件与扩容
Channel的register方法还有 第三个参数 :附件
,可以向其中放入一个Object类型的对象,该对象会与登记的Channel以及其对应的SelectionKey
绑定,可以从SelectionKey
获取到对应通道的附件
public final SelectionKey register(Selector sel, int ops, Object att)
可通过SelectionKey的 attachment()方法获得附件
ByteBuffer buffer = (ByteBuffer) key.attachment();
需要在Accept事件发生后,将通道注册到Selector中时, 对每个通道添加一个ByteBuffer附件 ,让每个通道发生读事件时都使用自己的通道,避免与其他通道发生冲突而导致问题
// 设置为非阻塞模式,同时将连接的通道也注册到选择器中,同时设置附件
socketChannel.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(16);
// 添加通道对应的Buffer附件
socketChannel.register(selector, SelectionKey.OP_READ, buffer);
当Channel中的数据大于缓冲区时,需要对缓冲区进行 扩容 操作。此代码中的扩容的判定方法: Channel调用compact方法后,position与limit相等,说明缓冲区中的数据并未被读取(容量太小),此时创建新的缓冲区,其大小扩大为两倍。同时还要将旧缓冲区中的数据拷贝到新的缓冲区中,同时调用SelectionKey的attach方法将新的缓冲区作为新的附件放入SelectionKey中
// 如果缓冲区太小,就进行扩容
if (buffer.position() == buffer.limit()) {
ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity()*2);
// 将旧buffer中的内容放入新的buffer中
newBuffer.put(buffer);
// 将新buffer作为附件放到key中
key.attach(newBuffer);
}
3.2 完整代码
- 需求
将服务端缓冲区大小设置成16,客户端向服务端发送数据21个字节的数据
0123456789abcdef3333\n
> > +\n
为消息的分隔符,占一个字节大小
- 过程分析
- 服务端代码
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import static com.lilinchao.nio.bytebuffer_2.ByteBufferUtil.debugAll;
/**
* Created by lilinchao
* Date 2022/6/3
* Description 服务端
*/
@Slf4j
public class MessageBorderServer {
public static void main(String[] args) throws IOException {
// 1. 创建 selector, 管理多个 channel
Selector selector = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
// 2. 建立 selector 和 channel 的联系(注册)
// SelectionKey 就是将来事件发生后,通过它可以知道事件和哪个channel的事件
SelectionKey sscKey = ssc.register(selector, 0, null);
// key 只关注 accept 事件
sscKey.interestOps(SelectionKey.OP_ACCEPT);
log.debug("sscKey:{}", sscKey);
ssc.bind(new InetSocketAddress(8080));
while (true) {
// 3. select 方法, 没有事件发生,线程阻塞,有事件,线程才会恢复运行
// select 在事件未处理时,它不会阻塞, 事件发生后要么处理,要么取消,不能置之不理
selector.select();
// 4. 处理事件, selectedKeys 内部包含了所有发生的事件
Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); // accept, read
while (iter.hasNext()) {
SelectionKey key = iter.next();
// 处理key 时,要从 selectedKeys 集合中删除,否则下次处理就会有问题
iter.remove();
log.debug("key: {}", key);
// 5. 区分事件类型
if (key.isAcceptable()) { // 如果是 accept
ServerSocketChannel channel = (ServerSocketChannel) key.channel();
SocketChannel sc = channel.accept();
sc.configureBlocking(false);
ByteBuffer buffer = ByteBuffer.allocate(16); // attachment
// 将一个 byteBuffer 作为附件关联到 selectionKey 上
SelectionKey scKey = sc.register(selector, 0, buffer);
scKey.interestOps(SelectionKey.OP_READ);
log.debug("{}", sc);
log.debug("scKey:{}", scKey);
} else if (key.isReadable()) { // 如果是 read
try {
SocketChannel channel = (SocketChannel) key.channel(); // 拿到触发事件的channel
// 获取 selectionKey 上关联的附件
ByteBuffer buffer = (ByteBuffer) key.attachment();
int read = channel.read(buffer); // 如果是正常断开,read 的方法的返回值是 -1
if(read == -1) {
key.cancel();
} else {
split(buffer);
// 需要扩容
if (buffer.position() == buffer.limit()) {
ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);
buffer.flip();
newBuffer.put(buffer); // 0123456789abcdef3333\n
key.attach(newBuffer);
}
}
} catch (IOException e) {
e.printStackTrace();
key.cancel(); // 因为客户端断开了,因此需要将 key 取消(从 selector 的 keys 集合中真正删除 key)
}
}
}
}
}
private static void split(ByteBuffer source) {
source.flip();
for (int i = 0; i < source.limit(); i++) {
// 找到一条完整消息
if (source.get(i) == '\n') {
int length = i + 1 - source.position();
// 把这条完整消息存入新的 ByteBuffer
ByteBuffer target = ByteBuffer.allocate(length);
// 从 source 读,向 target 写
for (int j = 0; j < length; j++) {
target.put(source.get());
}
debugAll(target);
}
}
source.compact(); // 0123456789abcdef position 16 limit 16
}
}
- 客户端代码
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
/**
* Created by lilinchao
* Date 2022/6/3
* Description 1.0
*/
public class MessageBorderClient {
public static void main(String[] args) throws IOException {
SocketChannel sc = SocketChannel.open();
sc.connect(new InetSocketAddress("localhost", 8080));
sc.write(Charset.defaultCharset().encode("0123456789abcdef3333\n"));
System.in.read();
}
}
- 输出结果
11:50:04 [DEBUG] [main] c.l.n.b.MessageBorderServer - sscKey:sun.nio.ch.SelectionKeyImpl@7dc36524
11:50:32 [DEBUG] [main] c.l.n.b.MessageBorderServer - key: sun.nio.ch.SelectionKeyImpl@7dc36524
11:50:32 [DEBUG] [main] c.l.n.b.MessageBorderServer - java.nio.channels.SocketChannel[connected local=/127.0.0.1:8080 remote=/127.0.0.1:51861]
11:50:32 [DEBUG] [main] c.l.n.b.MessageBorderServer - scKey:sun.nio.ch.SelectionKeyImpl@27f674d
11:50:32 [DEBUG] [main] c.l.n.b.MessageBorderServer - key: sun.nio.ch.SelectionKeyImpl@27f674d
11:50:32 [DEBUG] [main] c.l.n.b.MessageBorderServer - key: sun.nio.ch.SelectionKeyImpl@27f674d
+--------+-------------------- all ------------------------+----------------+
position: [21], limit: [21]
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 30 31 32 33 34 35 36 37 38 39 61 62 63 64 65 66 |0123456789abcdef|
|00000010| 33 33 33 33 0a |3333. |
+--------+-------------------------------------------------+----------------+
四、bytebuffer大小分配
-
每个 channel 都需要记录可能被切分的消息,因为 ByteBuffer 不能被多个 channel 共同使用,因此需要为每个 channel 维护一个独立的 ByteBuffer
-
ByteBuffer 不能太大,比如一个 ByteBuffer 1Mb 的话,要支持百万连接就要 1Tb 内存,因此需要设计大小可变的 ByteBuffer
- 一种思路是首先分配一个较小的 buffer,例如 4k,如果发现数据不够,再分配 8k 的 buffer,将 4k buffer 内容拷贝至 8k buffer,优点是消息连续容易处理,缺点是数据拷贝耗费性能,参考实现 http://tutorials.jenkov.com/java-performance/resizable-array.html
- 另一种思路是用多个数组组成 buffer,一个数组不够,把多出来的内容写入新的数组,与前面的区别是消息存储不连续解析复杂,优点是避免了拷贝引起的性能损耗
附参考文章:
《黑马程序员Netty教程》
Java 面试宝典是大明哥全力打造的 Java 精品面试题,它是一份靠谱、强大、详细、经典的 Java 后端面试宝典。它不仅仅只是一道道面试题,而是一套完整的 Java 知识体系,一套你 Java 知识点的扫盲贴。
它的内容包括:
- 大厂真题:Java 面试宝典里面的题目都是最近几年的高频的大厂面试真题。
- 原创内容:Java 面试宝典内容全部都是大明哥原创,内容全面且通俗易懂,回答部分可以直接作为面试回答内容。
- 持续更新:一次购买,永久有效。大明哥会持续更新 3+ 年,累计更新 1000+,宝典会不断迭代更新,保证最新、最全面。
- 覆盖全面:本宝典累计更新 1000+,从 Java 入门到 Java 架构的高频面试题,实现 360° 全覆盖。
- 不止面试:内容包含面试题解析、内容详解、知识扩展,它不仅仅只是一份面试题,更是一套完整的 Java 知识体系。
- 宝典详情:https://www.yuque.com/chenssy/sike-java/xvlo920axlp7sf4k
- 宝典总览:https://www.yuque.com/chenssy/sike-java/yogsehzntzgp4ly1
- 宝典进展:https://www.yuque.com/chenssy/sike-java/en9ned7loo47z5aw
目前 Java 面试宝典累计更新 400+ 道,总字数 42w+。大明哥还在持续更新中,下图是大明哥在 2024-12 月份的更新情况:
想了解详情的小伙伴,扫描下面二维码加大明哥微信【daming091】咨询
同时,大明哥也整理一套目前市面最常见的热点面试题。微信搜[大明哥聊 Java]或扫描下方二维码关注大明哥的原创公众号[大明哥聊 Java] ,回复【面试题】 即可免费领取。