2023-09-13
原文作者:https://blog.csdn.net/wangwei19871103/category_9681495_2.html 原文地址: https://blog.csdn.net/wangwei19871103/article/details/104793430

AbstractUnsafe的方法

beginRead开始读

判断下条件,准备开始读,真正读的是通道的doBeginRead方法。

     @Override
            public final void beginRead() {
                assertEventLoop();
    
                if (!isActive()) {
                    return;
                }
    
                try {
                    doBeginRead();
                } catch (final Exception e) {
                    invokeLater(new Runnable() {
                        @Override
                        public void run() {
                            pipeline.fireExceptionCaught(e);
                        }
                    });
                    close(voidPromise());
                }
            }

write写数据到出站缓冲区

这个以前讲过,就是write操作最后就是写入出站缓冲区。如果出站缓冲区关闭了,那就无用写了,释放消息即可,否则就封装后放入出站缓冲区里,里面是个单链表。

     @Override
            public final void write(Object msg, ChannelPromise promise) {
                assertEventLoop();
                ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
                if (outboundBuffer == null) {
                    ReferenceCountUtil.release(msg);
                    return;
                }
    
                int size;
                try {
                    msg = filterOutboundMessage(msg);//封装成直接缓冲区
                    size = pipeline.estimatorHandle().size(msg);//获取缓冲区大小
                    if (size < 0) {
                        size = 0;
                    }
                } catch (Throwable t) {
                    safeSetFailure(promise, t);
                    ReferenceCountUtil.release(msg);
                    return;
                }
    
                outboundBuffer.addMessage(msg, size, promise);//往出站缓冲区添加消息
            }

flush准备将出站缓冲区数据发出去

给出站缓冲区数据打好冲刷标记,然后准备冲刷flush0

      @Override
            public final void flush() {
                assertEventLoop();
    
                ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
                if (outboundBuffer == null) {
                    return;
                }
    
                outboundBuffer.addFlush();
                flush0();
            }

flush0将出站缓冲区数据发出去

具体的发送方法,主要是调用通道的doWrite方法,里面才是将数据从通道中发出去。

      protected void flush0() {
                if (inFlush0) {//避免重入
     
                    return;
                }
    
                final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
                if (outboundBuffer == null || outboundBuffer.isEmpty()) {
                    return;
                }
    
                inFlush0 = true;
    
                if (!isActive()) {//通道失效的话
                    try {
                        if (isOpen()) {//报错并通知
                            outboundBuffer.failFlushed(new NotYetConnectedException(), true);
                        } else {
    //报错不通知
                            outboundBuffer.failFlushed(newClosedChannelException(initialCloseCause), false);
                        }
                    } finally {
                        inFlush0 = false;
                    }
                    return;
                }
    
                try {
                    doWrite(outboundBuffer);
                } catch (Throwable t) {
                    if (t instanceof IOException && config().isAutoClose()) {
                       
                        initialCloseCause = t;
                        close(voidPromise(), t, newClosedChannelException(t), false);
                    } else {
                        try {
                            shutdownOutput(voidPromise(), t);
                        } catch (Throwable t2) {
                            initialCloseCause = t;
                            close(voidPromise(), t2, newClosedChannelException(t), false);
                        }
                    }
                } finally {
                    inFlush0 = false;
                }
            }

newClosedChannelException创建一个通道关闭异常

将异常信息封装成ClosedChannelException

      private ClosedChannelException newClosedChannelException(Throwable cause) {
                ClosedChannelException exception = new ClosedChannelException();
                if (cause != null) {
                    exception.initCause(cause);
                }
                return exception;
            }

voidPromise没干什么事的回调

     @Override
            public final ChannelPromise voidPromise() {
                assertEventLoop();
    
                return unsafeVoidPromise;
            }

ensureOpen确认通道打开

       protected final boolean ensureOpen(ChannelPromise promise) {
                if (isOpen()) {
                    return true;
                }
                safeSetFailure(promise, newClosedChannelException(initialCloseCause));
                return false;
            }

safeSetSuccess设置成功回调

      protected final void safeSetSuccess(ChannelPromise promise) {
                if (!(promise instanceof VoidChannelPromise) && !promise.trySuccess()) {
                    logger.warn("Failed to mark a promise as success because it is done already: {}", promise);
                }
            }

safeSetFailure设置失败回调,带异常信息

      protected final void safeSetFailure(ChannelPromise promise, Throwable cause) {
                if (!(promise instanceof VoidChannelPromise) && !promise.tryFailure(cause)) {
                    logger.warn("Failed to mark a promise as failure because it's done already: {}", promise, cause);
                }
            }

invokeLater延时任务

其实就是提交任务给IO线程。

       private void invokeLater(Runnable task) {
                try {
                    eventLoop().execute(task);
                } catch (RejectedExecutionException e) {
                    logger.warn("Can't invoke task later as EventLoop rejected it", e);
                }
            }

NioUnsafe

这个是针对NIO的子接口:

    public interface NioUnsafe extends Unsafe {
    
            SelectableChannel ch();
    
            void finishConnect();
    
            void read();
    
            void forceFlush();
        }

AbstractNioUnsafe

继承了抽象AbstractUnsafe,实现了NioUnsafe的接口,一些基本的NIO实现都有。

202309132212307911.png

removeReadOp清除读监听

如果发现有读监听就删除了,用的是位操作。什么时候会进行读的清除呢,一般是设置自动读的,所以不会清除读监听,而且默认NioSocketChannel是监听读的。

    protected final void removeReadOp() {
                SelectionKey key = selectionKey();
                if (!key.isValid()) {
                    return;
                }
                int interestOps = key.interestOps();
                if ((interestOps & readInterestOp) != 0) {
                    key.interestOps(interestOps & ~readInterestOp);
                }
            }

202309132212320692.png

connect建立连接

如果doConnect能连接上,就处理回调fulfillConnectPromise,否则如果有设置超时的话就提交超时调度任务,如果连接上了,就把超时任务取消。

       @Override
            public final void connect(
                    final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
    
                try {
                    if (connectPromise != null) {
                        // Already a connect in process.
                        throw new ConnectionPendingException();
                    }
    
                    boolean wasActive = isActive();
                    if (doConnect(remoteAddress, localAddress)) {
                        fulfillConnectPromise(promise, wasActive);
                    } else {
                        connectPromise = promise;
                        requestedRemoteAddress = remoteAddress;
    
                        // Schedule connect timeout.
                        int connectTimeoutMillis = config().getConnectTimeoutMillis();
                        if (connectTimeoutMillis > 0) {
                            connectTimeoutFuture = eventLoop().schedule(new Runnable() {
                                @Override
                                public void run() {
                                    ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;
                                    ConnectTimeoutException cause =
                                            new ConnectTimeoutException("connection timed out: " + remoteAddress);
                                    if (connectPromise != null && connectPromise.tryFailure(cause)) {
                                        close(voidPromise());
                                    }
                                }
                            }, connectTimeoutMillis, TimeUnit.MILLISECONDS);
                        }
    
                        promise.addListener(new ChannelFutureListener() {
                            @Override
                            public void operationComplete(ChannelFuture future) throws Exception {
                                if (future.isCancelled()) {
                                    if (connectTimeoutFuture != null) {
                                        connectTimeoutFuture.cancel(false);
                                    }
                                    connectPromise = null;
                                    close(voidPromise());
                                }
                            }
                        });
                    }
                } catch (Throwable t) {
                    promise.tryFailure(annotateConnectException(t, remoteAddress));
                    closeIfClosed();
                }
            }

fulfillConnectPromise实现连接回调

连接成功,如果前面通道没激活,现在激活了,就传递激活事件。

     private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {
                if (promise == null) {
    
                    return;
                }
    
                boolean active = isActive();
    			//尝试成功回调
                boolean promiseSet = promise.trySuccess();
    
    			//前面没激活,现在激活了
                if (!wasActive && active) {
                    pipeline().fireChannelActive();
                }
    
    			//失败就关闭
                if (!promiseSet) {
                    close(voidPromise());
                }
            }

还有一个失败的回调,附带异常的。

      private void fulfillConnectPromise(ChannelPromise promise, Throwable cause) {
                if (promise == null) {
                    return;
                }
    
                promise.tryFailure(cause);
                closeIfClosed();
            }

finishConnect完成连接

连接完成了,不管是否成功都要回调,要处理。

     @Override
            public final void finishConnect() {
    
                assert eventLoop().inEventLoop();
    
                try {
                    boolean wasActive = isActive();
                    doFinishConnect();
                    fulfillConnectPromise(connectPromise, wasActive);
                } catch (Throwable t) {
                    fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));
                } finally {
    
                    if (connectTimeoutFuture != null) {
                        connectTimeoutFuture.cancel(false);
                    }
                    connectPromise = null;
                }
            }

如果处理的是连接事件的话,就会去调用这个。

202309132212326423.png

flush0发送数据

如果没有待发的就发出去,否则要留给待发的。

     @Override
            protected final void flush0() {
                if (!isFlushPending()) {//没有待刷的操作
                    super.flush0();
                }
            }

isFlushPending是否有待发送的数据

也就是注册了OP_WRITE写事件,可能是因为前面数据没发送完,所以注册了写数据,要继续发。

            private boolean isFlushPending() {
                SelectionKey selectionKey = selectionKey();
                return selectionKey.isValid() && (selectionKey.interestOps() & SelectionKey.OP_WRITE) != 0;
            }

forceFlush强制发送数据

      @Override
            public final void forceFlush() {
                super.flush0();
            }

如果有写事件,会强制刷出去,也就是把上次没发完的数据发出去。

202309132212337524.png

好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。

阅读全文