2023-01-14
原文作者: HelloWorld_EE 原文地址:https://blog.csdn.net/u010412719/category_6159934_2.html

《Java 源码分析》:Java NIO 之 Selector(第二部分selector.select())

上篇博文《Java 源码分析》:Java NIO 之 Selector(第一部分Selector.open())从源码的角度主要介绍了Selector.open()这个方法背后主要做了什么,发生了什么。

本篇就是第二部分:从源码的角度来看下selector.select()背后做了些什么,怎么做的

在看这篇博文之前,希望你已经阅读了上一篇博文:《Java 源码分析》:Java NIO 之 Selector(第一部分Selector.open())。因为这篇博文是在上篇博文的基础上来进行介绍的。

为了更好的方便理解这篇博文所介绍的内容,我们先来回顾下上篇博文中所介绍的内容。

Selector selector = Selector.open();这行代码简单来说:实例化了一个WindowSelectorImpl类的对象。并且在windows下通过两个链接的socketChannel实现了Pipe。

知道上面一点知识就更加方便的来理解了。下面开始详细的介绍。

selector.select() 介绍

selector.select()在Selector类中此方法是一个抽象的。

如下:

    public abstract int select() throws IOException;

函数功能(根据源码上的注释翻译而来):选择一些I/O操作已经准备好的管道。每个管道对应着一个key。这个方法 是一个阻塞的选择操作。当至少有一个通道被选择时才返回。当这个方法被执行时,当前线程是允许被中断的。

除了这个方法,还有两个重载方法,如下

1、select(long timeout)

    public abstract int select(long timeout)
        throws IOException;

select(long timeout)和select()一样,除了最长会阻塞timeout毫秒(参数)。

这个方法并不能提供精确时间的保证,和当执行wait(long timeout)方法时并不能保证会延时timeout道理一样。

这里的timeout说明如下:

  • 如果 timeout为正,则select(long timeout)在等待有通道被选择时至多会阻塞timeout毫秒
  • 如果timeout为零,则永远阻塞直到有至少一个通道准备就绪。
  • timeout不能为负数

2、selectNow()

    public abstract int selectNow() throws IOException;

这个方法与select()的区别在于,是非阻塞的,即当前操作即使没有通道准备好也是立即返回。只是返回的是0.

值得注意的是:调用这个方法会清除所有之前执行了wakeup方法的作用。

以上就是select()以及其两个重载方法的一点说明。

下面来看select()方法在其子类的具体实现

在上篇博文中我们通过源码的角度知道Selector selector = Selector.open();代码中的selector实际是指向的是其子类WindowsSelectorImpl 的对象实例。

因此在 我们执行 selector.select()方法时,实际上时调用的是 WindowsSelectorImpl 中的select()方法

在找这个函数的时候,觉得有必要说下Selector, WindowsSelectorImpl 之间的继承关系:

  • WindowsSelectorImpl 的直接父类是 SelectorImpl(select方法在这个里面实现);
  • SelectorImpl 的直接父类是 AbstractSelector
  • AbstractSelector 的直接父类是 Selector.

以上就是他们的继承关系,其中select()方法是在 SelectorImpl类中进行实现的。

——SelectorImpl.java中的部分代码如下———–

        public int select(long timeout)
            throws IOException
        {
            if (timeout < 0)
                throw new IllegalArgumentException("Negative timeout");
            return lockAndDoSelect((timeout == 0) ? -1 : timeout);
        }
    
        public int select() throws IOException {
            return select(0);
        }
    
        public int selectNow() throws IOException {
            return lockAndDoSelect(0);
        }

其中select()方法调用了select(timeout)方法,只是timeout=0.

以上三个函数都是调用了 lockAndDoSelect()方法(此方法也是在SelectorImpl中实现的)

        private int lockAndDoSelect(long timeout) throws IOException {
            synchronized (this) {
                if (!isOpen()) //检查这个Selector是否打开
                    throw new ClosedSelectorException();
                //双重锁
                //publicKeys和 publicSelectedKeys的定义如下:
              //private Set<SelectionKey> publicKeys;             // Immutable
              //private Set<SelectionKey> publicSelectedKeys;     // Removal allowed, but not addition
                synchronized (publicKeys) {
                    synchronized (publicSelectedKeys) {
                        return doSelect(timeout);
                    }
                }
            }
        }

1、doSelect(timeout)方法在第2点中进行说明,这里先看下这个isOpen()函数的具体实现,这个函数是在 AbstractSelector 中实现的。

    public final boolean isOpen() {
        return selectorOpen.get();//
    }

函数的功能:检查这个Selector是否打开;
在isOpen()方法体中的selectorOpen变量在AbstractSelector类中定义如下:

    private AtomicBoolean selectorOpen = new AtomicBoolean(true);

即selectorOpen是一个原子性的变量;如果在执行selector.select()方法之前执行了Selector selector = Selector.open();则selectorOpen就进行了初始化,为true。否则为false。

2、doSelect(timeout)方法的介绍

这个是lockAndDoSelect()方法中的关键,下面具体来看

        protected int doSelect(long timeout) throws IOException {
            if (channelArray == null)
                throw new ClosedSelectorException();
            this.timeout = timeout; // set selector timeout
            processDeregisterQueue();
            if (interruptTriggered) {
                resetWakeupSocket();
                return 0;
            }
            // Calculate number of helper threads needed for poll. If necessary
            // threads are created here and start waiting on startLock
            adjustThreadsCount();
            finishLock.reset(); // reset finishLock
            // Wakeup helper threads, waiting on startLock, so they start polling.
            // Redundant threads will exit here after wakeup.
            startLock.startThreads();
            // do polling in the main thread. Main thread is responsible for
            // first MAX_SELECTABLE_FDS entries in pollArray.
            try {
                begin();
                try {
                    subSelector.poll();
                } catch (IOException e) {
                    finishLock.setException(e); // Save this exception
                }
                // Main thread is out of poll(). Wakeup others and wait for them
                if (threads.size() > 0)
                    finishLock.waitForHelperThreads();
              } finally {
                  end();
              }
            // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.
            finishLock.checkForException();
            processDeregisterQueue();
            int updated = updateSelectedKeys();
            // Done with poll(). Set wakeupSocket to nonsignaled  for the next run.
            resetWakeupSocket();
            return updated;
        }

这个方法的代码还是比较复杂的哈(这次就不详细的对每行代码进行一个分析了哈,以后遇到问题了再详细的来理解分析),但是我们首先要关注一点,就是subSelector.poll()这行代码,这个是一个核心,也就是轮训pollWrapper中保存的FD;具体实现是调用native方法poll0:

SubSelector 是 WindowsSelectorImpl的一个内部类。

SubSelector 类中的poll()方法源码如下:

            // These arrays will hold result of native select().
            // The first element of each array is the number of selected sockets.
            // Other elements are file descriptors of selected sockets.
            private final int[] readFds = new int [MAX_SELECTABLE_FDS + 1];//readFds保存发生read的FD
            private final int[] writeFds = new int [MAX_SELECTABLE_FDS + 1];//writeFds保存发生写的FD
            private final int[] exceptFds = new int [MAX_SELECTABLE_FDS + 1];//exceptFds保存发生except的FD
    
            private int poll() throws IOException{ // poll for the main thread
                return poll0(pollWrapper.pollArrayAddress,
                             Math.min(totalChannels, MAX_SELECTABLE_FDS),
                             readFds, writeFds, exceptFds, timeout);
            }

其中poll0是一个native方法

              /*
              参数说明:readFds保存发生read的FD,writeFds保存发生写的FD,
                        exceptFds保存发生except的FD
               */
             private native int poll0(long pollAddress, int numfds,
                 int[] readFds, int[] writeFds, int[] exceptFds, long timeout);

这个poll0()会监听pollWrapper中的FD有没有数据进出,这会造成IO阻塞,直到有数据读写事件发生。
比如,由于pollWrapper中保存的也有ServerSocketChannel的FD(在上篇博文中提到),所以只要ClientSocket发一份数据到ServerSocket,那么poll0()就会返回;
又由于pollWrapper中保存的也有pipe的write端的FD,所以只要pipe的write端向FD发一份数据,也会造成poll0()返回;
如果这两种情况都没有发生,那么poll0()就一直阻塞,也就是selector.select()会一直阻塞;如果有任何一种情况发生,那么selector.select()就会返回,
所有在OperationServer的run()里要用while (true) {,这样就可以保证在selector接收到数据并处理完后继续监听poll();

WindowsSelectorImpl.wakeup()

看完了select()方法的内部实现思路,最后来看下:WindowsSelectorImpl.wakeup()的具体实现.

wakeup()方法源码如下:

        public Selector wakeup() {
            synchronized (interruptLock) {
                if (!interruptTriggered) {
                    setWakeupSocket();
                    interruptTriggered = true;
                }
            }
            return this;
        }
        // Sets Windows wakeup socket to a signaled state.
        private void setWakeupSocket() {
            setWakeupSocket0(wakeupSinkFd);
        }
        private native void setWakeupSocket0(int wakeupSinkFd);

native实现摘要:

——-WindowsSelectorImpl.c —-

        Java_sun_nio_ch_WindowsSelectorImpl_setWakeupSocket0(JNIEnv *env, jclass this,
                                                        jint scoutFd)
        {
            /* Write one byte into the pipe */
            send(scoutFd, (char*)&POLLIN, 1, 0);
        }

这里完成了向最开始建立的pipe的sink端写入了一个字节,source文件描述符就会处于就绪状态,poll方法会返回,从而导致select方法返回。(原来自己建立一个socket链着自己另外一个socket就是为了干这事)

sun.nio.ch包下面所有的类库可以在这里看到:http://www.docjar.com/docs/api/sun/nio/ch/package-index.html

小结

关于selector.select()方法中的脉络就这样的顺了一遍,还是有很多的细节自己由于水平的原因没有理解清楚,如有错误,请批评指正。

由于目前自己从源码的角度只看了Selector.open()方法和selector.select()方法的内部实现。还有以下几块内容有待自己去理解:

1、Channel 类中的 register()方法,其中涉及到ServerSocketChannel 类和SocketChannel类。

2、selector.selectedKey()方法返回的Set集合中的值是何时添加进去的,以及SelectionKey类相关的一些操作的具体实现。

这就是两块比较重要内容,这对理解整个的Java NIO有很大帮助,因此,自己也会在最近的时间来啃以上相关的内容。

最后,还是贴上下面的框图。

102802845.jpg

阅读全文