2023-09-13
原文作者:https://blog.csdn.net/wangwei19871103/category_9681495_2.html 原文地址: https://blog.csdn.net/wangwei19871103/article/details/104795585

NioMessageUnsafe

202309132212385181.png
这个是专门来处理客户端连接的unsafe

read读客户端连接

上篇分析了可调节的接收缓冲区,这篇就来看看他是怎么用的,所以就要看NioMessageUnsafeNioByteUnsaferead,我们先介绍NioMessageUnsaferead。这个是用来接收客户端连接的,读取的是客户端通道。allocHandle的操作上篇文章里都讲了,就不多说了。这里是把所有的客户端连接接受完了,然后传递每一个连接的读事件。

            @Override
            public void read() {
                assert eventLoop().inEventLoop();
                final ChannelConfig config = config();//获取通道的配置
                final ChannelPipeline pipeline = pipeline();//获取管道
                final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();//获取接受缓冲区处理器
                allocHandle.reset(config);//重置配置参数
    
                boolean closed = false;
                Throwable exception = null;
                try {
                    try {
                        do {
                            int localRead = doReadMessages(readBuf);//读取消息到readBuf中
                            if (localRead == 0) {//没有消息
                                break;
                            }
                            if (localRead < 0) {//关闭是-1
                                closed = true;
                                break;
                            }
    
                            allocHandle.incMessagesRead(localRead);//读取消息数+1
                        } while (allocHandle.continueReading());
                    } catch (Throwable t) {
                        exception = t;
                    }
    
                    int size = readBuf.size();
                    for (int i = 0; i < size; i ++) {
                        readPending = false;
                        pipeline.fireChannelRead(readBuf.get(i));//传播读事件,传递的是客户端通道
                    }
                    readBuf.clear();
                    allocHandle.readComplete();
                    pipeline.fireChannelReadComplete();//传播读完成事件
    
                    if (exception != null) {
                        closed = closeOnReadError(exception);
                        pipeline.fireExceptionCaught(exception);
                    }
    
                    if (closed) {
                        inputShutdown = true;
                        if (isOpen()) {
                            close(voidPromise());
                        }
                    }
                } finally {
                    if (!readPending && !config.isAutoRead()) {//不自动读了,就删除读事件监听,一般读完成后,都设置成自动读的
                        removeReadOp();
                    }
                }
            }

NioServerSocketChannel的doReadMessages

可以看到这里准备去接受一个客户端。

202309132212390592.png
内部其实就是用ServerSocketChannel去接受一个SocketChannel啦。

202309132212400863.png
然后放进一个list中, 返回1。如果没接受到,那就返回0。后面就开始把这个通道传递下去了:

202309132212410134.png

NioByteUnsafe

202309132212420385.png
这个是专门来处理客户端读数据的。

read读客户端数据

其实逻辑和读客户端连接差不多,只不过这次是需要接收缓冲区分配器分配缓冲区来接受读的信息。然后接受一次传递一次读事件。这里为什么要读一次传一次,而不是全部读完再传呢,我想可能是因为这样读一部分处理一分部分,处理完了就释放内存,可以提高吞吐量,而且也不至于数据有积压,可能造成内存溢出。

     @Override
            public final void read() {
                final ChannelConfig config = config();
                if (shouldBreakReadReady(config)) {
                    clearReadPending();
                    return;
                }
                final ChannelPipeline pipeline = pipeline();
                final ByteBufAllocator allocator = config.getAllocator();
                final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
                allocHandle.reset(config);
    
                ByteBuf byteBuf = null;
                boolean close = false;
                try {
                    do {
                        byteBuf = allocHandle.allocate(allocator);//分配缓冲区
                        allocHandle.lastBytesRead(doReadBytes(byteBuf));//读取通道内数据到缓冲区,如果是客户端断开,读取数据个数是-1
                        if (allocHandle.lastBytesRead() <= 0) {//如果没读到数据0 或者客户端关闭-1
                            // nothing was read. release the buffer.
                            byteBuf.release();
                            byteBuf = null;
                            close = allocHandle.lastBytesRead() < 0;//-1
                            if (close) {//把通道关闭,也就是没发数据来
                                // There is nothing left to read as we received an EOF.
                                readPending = false;
                            }
                            break;
                        }
    
                        allocHandle.incMessagesRead(1);//增加读的消息数
                        readPending = false;
                        pipeline.fireChannelRead(byteBuf);//管道里传递消息
                        byteBuf = null;
                    } while (allocHandle.continueReading());
    
                    allocHandle.readComplete();
                    pipeline.fireChannelReadComplete();//读取完成
    
                    if (close) {
                        closeOnRead(pipeline);
                    }
                } catch (Throwable t) {
                    handleReadException(pipeline, byteBuf, t, close, allocHandle);
                } finally {
                  
                    if (!readPending && !config.isAutoRead()) {//如果还要继续读的话,就不会删除读监听
                        removeReadOp();
                    }
                }
            }

handleReadException处理读异常

如果出现了读的异常,因为有缓冲区申请在那边,如果缓冲区有数据,那就先把数据传递下去,否则就直接释放,然后还是按普通逻辑去统计,传递读完成事件,最后传递异常事件。

    private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
                    RecvByteBufAllocator.Handle allocHandle) {
                if (byteBuf != null) {
                    if (byteBuf.isReadable()) {//如果有可读的话,把传递消息
                        readPending = false;
                        pipeline.fireChannelRead(byteBuf);
                    } else {
                        byteBuf.release();
                    }
                }
                allocHandle.readComplete();
                pipeline.fireChannelReadComplete();//读完成
                pipeline.fireExceptionCaught(cause);//传递异常
                if (close || cause instanceof IOException) {//如果是IO异常
                    closeOnRead(pipeline);//传递自定义事件
                }
            }

closeOnRead读关闭(半关闭)

这里可能要涉及一个半关闭的概念,简单点就是说我们客户端和服务端的socket其实都有两个缓冲区,即读缓冲区和写缓冲区,如果TCP要分手了,客户端就会先发送关闭请求,然后把写缓冲区关闭了,就不写了,但是这个时候他是可以读客户端的消息的,即读缓冲区还没关,所以叫做半关闭。所以这里就是判断是否是把读缓冲区关了,如果关了,就直接传递自定义消息,否则就判断是否配置了半关闭,是的话就进行读关闭,传递自定义消息,否则就关闭通道了。

     private void closeOnRead(ChannelPipeline pipeline) {
                if (!isInputShutdown0()) {//单方关闭输出流,但是可以接受输入,即半关闭
                    if (isAllowHalfClosure(config())) {//是否配置了半关闭
                        shutdownInput();//关闭输入流
                        pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
                    } else {
                        close(voidPromise());
                    }
                } else {
                    inputClosedSeenErrorOnRead = true;
                    pipeline.fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);//传递自定义事件
                }
            }

TCP关闭简单举例

其实简单点来说就是,2个人打电话,A说,我说完了( A写关闭 ),B还有话的话就说,如果也说完了,就跟A说,我也说完了( B写关闭 ).A收到后,说好的,再见( A读关闭 ),B听到后说,再见( B读关闭 )。至此,双方的读写都关闭了。不知道我这么描述对不对哈哈。

NioSocketChannelUnsafe

202309132212427926.png
这个是NioSocketChannelunsafe。主要是prepareToClose方法的覆盖,取消注册事件,返回全局唯一的GlobalEventExecutor

      private final class NioSocketChannelUnsafe extends NioByteUnsafe {
            @Override
            protected Executor prepareToClose() {
                try {
                    if (javaChannel().isOpen() && config().getSoLinger() > 0) {
    
                        doDeregister();
                        return GlobalEventExecutor.INSTANCE;
                    }
                } catch (Throwable ignore) {
    
                }
                return null;
            }
        }

至此unsafe基本都了解了,但是具体流程怎么工作的,还是要自己debug一下。

好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。

阅读全文