2023-09-23  阅读(122)
原文作者:李林超 原文地址: https://www.lilinchao.com/archives/2112.html

[TOC]

一、消息边界问题的产生

1.1 服务端代码

    import lombok.extern.slf4j.Slf4j;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.nio.charset.Charset;
    import java.util.Iterator;
    
    /**
     * Created by lilinchao
     * Date 2022/6/3
     * Description 消息边界问题  服务端
     */
    @Slf4j
    public class Server {
        public static void main(String[] args) throws IOException {
    
            //1.创建selector,管理多个channel
            Selector selector = Selector.open();
    
            ServerSocketChannel ssc = ServerSocketChannel.open();
            ssc.configureBlocking(false);
    
            //2. 建立channel和selector之间的联系(注册)
            SelectionKey sscKey = ssc.register(selector, 0, null);
            sscKey.interestOps(SelectionKey.OP_ACCEPT);
            ssc.bind(new InetSocketAddress(8080));
    
            while (true){
                //3. selector.select()方法,没有事件就阻塞,有了事件发送了就恢复运行继续向下处理
                selector.select();
    
                //4. 处理事件,selectionKeys拿到所有发生的可读可写的事件
                Iterator<SelectionKey> iterator = selector.selectedKeys().iterator();
    
                //多个key的时候,accept和read方法都会触发事件,所以要区分事件类型
                while (iterator.hasNext()){
                    SelectionKey key = iterator.next();
                    //处理key的时候要从selectKeys中删除,否则会报错
                    iterator.remove();
    
                    //5.区分事件类型
                    if(key.isAcceptable()){
                        //拿到触发事件的channel
                        ServerSocketChannel channel = (ServerSocketChannel)key.channel();
                        SocketChannel sc = channel.accept();
                        //设置为非阻塞
                        sc.configureBlocking(false);
                        //scKey管sc的channel
                        SelectionKey scKey = sc.register(selector, 0, null);
                        //scKey关注读事件,也就是说客户端的通道关注可读事件
                        scKey.interestOps(SelectionKey.OP_READ);
                    }else if(key.isReadable()){
                        //客户端关闭之后也会引发read事件,这时需要从key中remove掉,否则拿不到channel,报错
                        try {
                            SocketChannel channel = (SocketChannel)key.channel();
                            //将缓冲区大小设置为4
                            ByteBuffer buffer1 = ByteBuffer.allocate(4);
                            //客户端正常断开,read返回值是-1
                            int read = channel.read(buffer1);
                            if(read == -1){
                                //正常断开
                                key.channel();
                            }
                            buffer1.flip();
                            System.out.println(Charset.defaultCharset().decode(buffer1));
                        } catch (IOException e) {
                            e.printStackTrace();
                            key.cancel();//客户端断开,需要将key取消(从selector的key集合中真正删除)
                        }
                    }
    
                }
    
            }
    
        }
    }

1.2 客户端代码

    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.net.SocketAddress;
    import java.nio.channels.SocketChannel;
    /**
     * Created by lilinchao
     * Date 2022/6/3
     * Description 客户端
     */
    public class Client {
        public static void main(String[] args) throws IOException {
            SocketChannel sc = SocketChannel.open();
            sc.connect(new InetSocketAddress("localhost", 8080));
    
            SocketAddress localAddress = sc.getLocalAddress();
            System.out.println("waiting...");
        }
    }

运行程序

(1)运行服务端代码

(2)通过Debug模式运行客户端代码

(3)通过客户端向服务端发送如下请求

    sc.write(Charset.defaultCharset().encode("中国"));

服务端输出结果

202309232223437291.png

从输出结果可以看到, 字出现了乱码。

问题分析

因为在服务端代码中设置的接收客户端数据的缓冲区大小是4个字节,在UTF-8编码中,一个汉字占三个字节,也就是服务端在接收客户端发送到的消息时,只接收到了中字的三个字节和国字的第一个字节就进行了打印输出,导致国字出现了半包问题,产生了乱码。

二、消息边界问题分析

202309232223441452.png

分析

  • 时刻1 :ByteBufeer较小,但是发送过来的消息比较大,一次处理不完;
  • 时刻2 :ByteBufeer较大,消息比较小。会出现半包现象
  • 时刻3 :ButeBuffer可以一次性接收客户端发送过来的多条消息。此时会出现黏包现象

解决思路

(1) 固定消息长度 ,数据包大小一样,服务器按预定长度读取,当发送的数据较少时,需要将数据进行填充,直到长度与消息规定长度一致。缺点是浪费带宽

(2) 按分隔符拆分 ,缺点是效率低,需要一个一个字符去匹配分隔符

(3) TLV 格式,即 Type 类型、Length 长度、Value 数据 (也就是在消息开头 用一些空间存放后面数据的长度 ),如HTTP请求头中的Content-Type与 Content-Length 。类型和长度已知的情况下,就可以方便获取消息大小,分配合适的 buffer,缺点是 buffer 需要提前分配,如果内容过大,则影响 server 吞吐量

202309232223444973.png

三、解决消息边界问题

本示例将按照第二种方式, 按分隔符拆分 来解决消息边界问题。

3.1 附件与扩容

Channel的register方法还有 第三个参数附件,可以向其中放入一个Object类型的对象,该对象会与登记的Channel以及其对应的SelectionKey绑定,可以从SelectionKey获取到对应通道的附件

    public final SelectionKey register(Selector sel, int ops, Object att)

可通过SelectionKey的 attachment()方法获得附件

    ByteBuffer buffer = (ByteBuffer) key.attachment();

需要在Accept事件发生后,将通道注册到Selector中时, 对每个通道添加一个ByteBuffer附件 ,让每个通道发生读事件时都使用自己的通道,避免与其他通道发生冲突而导致问题

    // 设置为非阻塞模式,同时将连接的通道也注册到选择器中,同时设置附件
    socketChannel.configureBlocking(false);
    ByteBuffer buffer = ByteBuffer.allocate(16);
    // 添加通道对应的Buffer附件
    socketChannel.register(selector, SelectionKey.OP_READ, buffer);

当Channel中的数据大于缓冲区时,需要对缓冲区进行 扩容 操作。此代码中的扩容的判定方法: Channel调用compact方法后,position与limit相等,说明缓冲区中的数据并未被读取(容量太小),此时创建新的缓冲区,其大小扩大为两倍。同时还要将旧缓冲区中的数据拷贝到新的缓冲区中,同时调用SelectionKey的attach方法将新的缓冲区作为新的附件放入SelectionKey中

    // 如果缓冲区太小,就进行扩容
    if (buffer.position() == buffer.limit()) {
        ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity()*2);
        // 将旧buffer中的内容放入新的buffer中
        newBuffer.put(buffer);
        // 将新buffer作为附件放到key中
        key.attach(newBuffer);
    }

3.2 完整代码

  • 需求

将服务端缓冲区大小设置成16,客户端向服务端发送数据21个字节的数据0123456789abcdef3333\n > > + \n为消息的分隔符,占一个字节大小

  • 过程分析

202309232223462514.png

  • 服务端代码
    import lombok.extern.slf4j.Slf4j;
    
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.ByteBuffer;
    import java.nio.channels.SelectionKey;
    import java.nio.channels.Selector;
    import java.nio.channels.ServerSocketChannel;
    import java.nio.channels.SocketChannel;
    import java.util.Iterator;
    
    import static com.lilinchao.nio.bytebuffer_2.ByteBufferUtil.debugAll;
    
    /**
     * Created by lilinchao
     * Date 2022/6/3
     * Description 服务端
     */
    @Slf4j
    public class MessageBorderServer {
        public static void main(String[] args) throws IOException {
            // 1. 创建 selector, 管理多个 channel
            Selector selector = Selector.open();
            ServerSocketChannel ssc = ServerSocketChannel.open();
            ssc.configureBlocking(false);
            // 2. 建立 selector 和 channel 的联系(注册)
            // SelectionKey 就是将来事件发生后,通过它可以知道事件和哪个channel的事件
            SelectionKey sscKey = ssc.register(selector, 0, null);
            // key 只关注 accept 事件
            sscKey.interestOps(SelectionKey.OP_ACCEPT);
            log.debug("sscKey:{}", sscKey);
            ssc.bind(new InetSocketAddress(8080));
            while (true) {
                // 3. select 方法, 没有事件发生,线程阻塞,有事件,线程才会恢复运行
                // select 在事件未处理时,它不会阻塞, 事件发生后要么处理,要么取消,不能置之不理
                selector.select();
                // 4. 处理事件, selectedKeys 内部包含了所有发生的事件
                Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); // accept, read
                while (iter.hasNext()) {
                    SelectionKey key = iter.next();
                    // 处理key 时,要从 selectedKeys 集合中删除,否则下次处理就会有问题
                    iter.remove();
                    log.debug("key: {}", key);
                    // 5. 区分事件类型
                    if (key.isAcceptable()) { // 如果是 accept
                        ServerSocketChannel channel = (ServerSocketChannel) key.channel();
                        SocketChannel sc = channel.accept();
                        sc.configureBlocking(false);
                        ByteBuffer buffer = ByteBuffer.allocate(16); // attachment
                        // 将一个 byteBuffer 作为附件关联到 selectionKey 上
                        SelectionKey scKey = sc.register(selector, 0, buffer);
                        scKey.interestOps(SelectionKey.OP_READ);
                        log.debug("{}", sc);
                        log.debug("scKey:{}", scKey);
                    } else if (key.isReadable()) { // 如果是 read
                        try {
                            SocketChannel channel = (SocketChannel) key.channel(); // 拿到触发事件的channel
                            // 获取 selectionKey 上关联的附件
                            ByteBuffer buffer = (ByteBuffer) key.attachment();
                            int read = channel.read(buffer); // 如果是正常断开,read 的方法的返回值是 -1
                            if(read == -1) {
                                key.cancel();
                            } else {
                                split(buffer);
                                // 需要扩容
                                if (buffer.position() == buffer.limit()) {
                                    ByteBuffer newBuffer = ByteBuffer.allocate(buffer.capacity() * 2);
                                    buffer.flip();
                                    newBuffer.put(buffer); // 0123456789abcdef3333\n
                                    key.attach(newBuffer);
                                }
                            }
    
                        } catch (IOException e) {
                            e.printStackTrace();
                            key.cancel();  // 因为客户端断开了,因此需要将 key 取消(从 selector 的 keys 集合中真正删除 key)
                        }
                    }
                }
            }
        }
    
        private static void split(ByteBuffer source) {
            source.flip();
            for (int i = 0; i < source.limit(); i++) {
                // 找到一条完整消息
                if (source.get(i) == '\n') {
                    int length = i + 1 - source.position();
                    // 把这条完整消息存入新的 ByteBuffer
                    ByteBuffer target = ByteBuffer.allocate(length);
                    // 从 source 读,向 target 写
                    for (int j = 0; j < length; j++) {
                        target.put(source.get());
                    }
                    debugAll(target);
                }
            }
            source.compact(); // 0123456789abcdef  position 16 limit 16
        }
    }
  • 客户端代码
    import java.io.IOException;
    import java.net.InetSocketAddress;
    import java.nio.channels.SocketChannel;
    import java.nio.charset.Charset;
    
    /**
     * Created by lilinchao
     * Date 2022/6/3
     * Description 1.0
     */
    public class MessageBorderClient {
        public static void main(String[] args) throws IOException {
            SocketChannel sc = SocketChannel.open();
            sc.connect(new InetSocketAddress("localhost", 8080));
            sc.write(Charset.defaultCharset().encode("0123456789abcdef3333\n"));
            System.in.read();
        }
    }
  • 输出结果
    11:50:04 [DEBUG] [main] c.l.n.b.MessageBorderServer - sscKey:sun.nio.ch.SelectionKeyImpl@7dc36524
    11:50:32 [DEBUG] [main] c.l.n.b.MessageBorderServer - key: sun.nio.ch.SelectionKeyImpl@7dc36524
    11:50:32 [DEBUG] [main] c.l.n.b.MessageBorderServer - java.nio.channels.SocketChannel[connected local=/127.0.0.1:8080 remote=/127.0.0.1:51861]
    11:50:32 [DEBUG] [main] c.l.n.b.MessageBorderServer - scKey:sun.nio.ch.SelectionKeyImpl@27f674d
    11:50:32 [DEBUG] [main] c.l.n.b.MessageBorderServer - key: sun.nio.ch.SelectionKeyImpl@27f674d
    11:50:32 [DEBUG] [main] c.l.n.b.MessageBorderServer - key: sun.nio.ch.SelectionKeyImpl@27f674d
    +--------+-------------------- all ------------------------+----------------+
    position: [21], limit: [21]
             +-------------------------------------------------+
             |  0  1  2  3  4  5  6  7  8  9  a  b  c  d  e  f |
    +--------+-------------------------------------------------+----------------+
    |00000000| 30 31 32 33 34 35 36 37 38 39 61 62 63 64 65 66 |0123456789abcdef|
    |00000010| 33 33 33 33 0a                                  |3333.           |
    +--------+-------------------------------------------------+----------------+

四、bytebuffer大小分配

  • 每个 channel 都需要记录可能被切分的消息,因为 ByteBuffer 不能被多个 channel 共同使用,因此需要为每个 channel 维护一个独立的 ByteBuffer

  • ByteBuffer 不能太大,比如一个 ByteBuffer 1Mb 的话,要支持百万连接就要 1Tb 内存,因此需要设计大小可变的 ByteBuffer

    • 一种思路是首先分配一个较小的 buffer,例如 4k,如果发现数据不够,再分配 8k 的 buffer,将 4k buffer 内容拷贝至 8k buffer,优点是消息连续容易处理,缺点是数据拷贝耗费性能,参考实现 http://tutorials.jenkov.com/java-performance/resizable-array.html
    • 另一种思路是用多个数组组成 buffer,一个数组不够,把多出来的内容写入新的数组,与前面的区别是消息存储不连续解析复杂,优点是避免了拷贝引起的性能损耗

附参考文章:

《黑马程序员Netty教程》


Java 面试宝典是大明哥全力打造的 Java 精品面试题,它是一份靠谱、强大、详细、经典的 Java 后端面试宝典。它不仅仅只是一道道面试题,而是一套完整的 Java 知识体系,一套你 Java 知识点的扫盲贴。

它的内容包括:

  • 大厂真题:Java 面试宝典里面的题目都是最近几年的高频的大厂面试真题。
  • 原创内容:Java 面试宝典内容全部都是大明哥原创,内容全面且通俗易懂,回答部分可以直接作为面试回答内容。
  • 持续更新:一次购买,永久有效。大明哥会持续更新 3+ 年,累计更新 1000+,宝典会不断迭代更新,保证最新、最全面。
  • 覆盖全面:本宝典累计更新 1000+,从 Java 入门到 Java 架构的高频面试题,实现 360° 全覆盖。
  • 不止面试:内容包含面试题解析、内容详解、知识扩展,它不仅仅只是一份面试题,更是一套完整的 Java 知识体系。
  • 宝典详情:https://www.yuque.com/chenssy/sike-java/xvlo920axlp7sf4k
  • 宝典总览:https://www.yuque.com/chenssy/sike-java/yogsehzntzgp4ly1
  • 宝典进展:https://www.yuque.com/chenssy/sike-java/en9ned7loo47z5aw

目前 Java 面试宝典累计更新 400+ 道,总字数 42w+。大明哥还在持续更新中,下图是大明哥在 2024-12 月份的更新情况:

想了解详情的小伙伴,扫描下面二维码加大明哥微信【daming091】咨询

同时,大明哥也整理一套目前市面最常见的热点面试题。微信搜[大明哥聊 Java]或扫描下方二维码关注大明哥的原创公众号[大明哥聊 Java] ,回复【面试题】 即可免费领取。

阅读全文