2023-09-13
原文作者:https://blog.csdn.net/wangwei19871103/category_9681495_2.html 原文地址: https://blog.csdn.net/wangwei19871103/article/details/104091612

选择器注册

上一篇讲的基本都是JNI方面的多,这篇我们就讲Java层的,JNI的基本能了解就行,如果想深挖自己再百度学习好了。我们讲比较重要的注册过程,内部到底是怎么注册,select返回的时候又是怎么处理的,先讲windows的。

serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

我们以serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);为例,用来监听客户端来的建立请求,来看看里面干了什么。最后是调用了java.nio.channels.spi.AbstractSelectableChannel的register

      public final SelectionKey register(Selector sel, int ops, Object att)
            throws ClosedChannelException
        {
        	
            if ((ops & ~validOps()) != 0)//验证参数
                throw new IllegalArgumentException();
            if (!isOpen())//验证通道否打开
                throw new ClosedChannelException();
            synchronized (regLock) {//注册锁
                if (isBlocking())//判断是否非阻塞模式
                    throw new IllegalBlockingModeException();
                synchronized (keyLock) {//key锁
                    // re-check if channel has been closed
                    if (!isOpen())//在此验证通道否打开
                        throw new ClosedChannelException();
                    SelectionKey k = findKey(sel);//是否有注册过key
                    if (k != null) {//有的话就设置下感兴趣事件和附件
                        k.attach(att);
                        k.interestOps(ops);
                    } else {
                        // New registration 否则就重新注册一个key到选择器上
                        k = ((AbstractSelector)sel).register(this, ops, att);//没有就注册下
                        addKey(k);//保存key,便于通道关闭的时候取消注册
                    }
                    return k;
                }
            }
        }

findKey(Selector sel)

获得当前通道中的key的选择器与传入选择器相同的key

        private SelectionKey findKey(Selector sel) {
            assert Thread.holdsLock(keyLock);
            if (keys == null)
                return null;
            for (int i = 0; i < keys.length; i++)
                if ((keys[i] != null) && (keys[i].selector() == sel))//存在且选择器相同
                    return keys[i];
            return null;
    
        }

register(AbstractSelectableChannel ch, int ops, Object attachment)

选择器注册keysun.nio.ch.SelectorImplregister

     @Override
        protected final SelectionKey register(AbstractSelectableChannel ch,
                                              int ops,
                                              Object attachment)
        {
            if (!(ch instanceof SelChImpl))
                throw new IllegalSelectorException();
            SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);//创建一个实现类
            k.attach(attachment);
            
            implRegister(k);//注册进选择器队列
    
            keys.add(k);//保存key
            try {
                k.interestOps(ops);//设置感兴趣事件
            } catch (ClosedSelectorException e) {
                assert ch.keyFor(this) == null;
                keys.remove(k);
                k.cancel();
                throw e;
            }
            return k;
        }

interestOps(int ops)

设置key的感兴趣事件sun.nio.ch.SelectionKeyImplinterestOps(int)

        @Override
        public SelectionKey interestOps(int ops) {
            ensureValid();
            if ((ops & ~channel().validOps()) != 0)
                throw new IllegalArgumentException();
            int oldOps = (int) INTERESTOPS.getAndSet(this, ops);
            if (ops != oldOps) {
                selector.setEventOps(this);//如果事件和以前的不一样,就添加到选择器更新队列中
            }
            return this;
        }
setEventOps(SelectionKeyImpl ski)

添加到选择器更新队列中sun.nio.ch.WindowsSelectorImplsetEventOps

        @Override
        public void setEventOps(SelectionKeyImpl ski) {
            ensureOpen();
            synchronized (updateLock) {
                updateKeys.addLast(ski);//添加更新的key
            }
        }

基本注册流程走完了,示意图大致是这样:

202309132206539011.png

windows选择器

doSelect()

直接到最后关键的地方sun.nio.ch.WindowsSelectorImpldoSelect,我们来看下主要的部分:

     @Override
        protected int doSelect(Consumer<SelectionKey> action, long timeout)
            throws IOException
        {
            ...
            processUpdateQueue();//根据key更新队列
            processDeregisterQueue();//如果key取消了,就要从队列中注销
            ...
            subSelector.poll();
    		...
            processDeregisterQueue();//再次注销队列
            int updated = updateSelectedKeys(action);//更新并处理key
    		...
            return updated;
        }

processUpdateQueue()

这个就是处理前面注册进去的key,把新的key信息添加到底层的文件描述符数组中,然后再设置感兴趣事件:

     private void processUpdateQueue() {
            assert Thread.holdsLock(this);
    
            synchronized (updateLock) {
                SelectionKeyImpl ski;
    
                // new registrations 新的注册key
                while ((ski = newKeys.pollFirst()) != null) {
                    if (ski.isValid()) {
                        growIfNeeded();
                        channelArray[totalChannels] = ski;//增加新的key
                        ski.setIndex(totalChannels);//设置key的索引
                        pollWrapper.putEntry(totalChannels, ski);//设置文件描述符和相应的事件到文件描述符数组中
                        totalChannels++;//通道数+1
                        MapEntry previous = fdMap.put(ski);//添加到map中
                        assert previous == null;
                    }
                }
    
                // changes to interest ops 更新感兴趣事件
                while ((ski = updateKeys.pollFirst()) != null) {
                    int events = ski.translateInterestOps();
                    int fd = ski.getFDVal();
                    if (ski.isValid() && fdMap.containsKey(fd)) {
                        int index = ski.getIndex();
                        assert index >= 0 && index < totalChannels;
                        pollWrapper.putEventOps(index, events);
                    }
                }
            }
        }

processDeregisterQueue()

把一些出现异常而取消的key删除:

      protected final void processDeregisterQueue() throws IOException {
            assert Thread.holdsLock(this);
            assert Thread.holdsLock(publicSelectedKeys);
    
            Set<SelectionKey> cks = cancelledKeys();//获取已经取消的keys
            synchronized (cks) {
                if (!cks.isEmpty()) {
                    Iterator<SelectionKey> i = cks.iterator();
                    while (i.hasNext()) {
                        SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
                        i.remove();
    
                        // remove the key from the selector 执行选择中注销操作
                        implDereg(ski);
    
                        selectedKeys.remove(ski);//删除要操作的key
                        keys.remove(ski);//从选择器key集合里删除
    
                        // remove from channel's key set 从通道中删除key
                        deregister(ski);
    
                        SelectableChannel ch = ski.channel();
                        if (!ch.isOpen() && !ch.isRegistered())
                            ((SelChImpl)ch).kill();//关闭通道
                    }
                }
            }
        }

subSelector.poll()

开始进行JNIselect轮询了,这个poll0前面讲过,就不多说了:

            private int poll() throws IOException{ // poll for the main thread
                return poll0(pollWrapper.pollArrayAddress,
                             Math.min(totalChannels, MAX_SELECTABLE_FDS),
                             readFds, writeFds, exceptFds, timeout);
            }

updateSelectedKeys(action)

这个就是返回之后的处理工作,看看有没有事件:

    private int updateSelectedKeys(Consumer<SelectionKey> action) {
            updateCount++;
            int numKeysUpdated = 0;
            numKeysUpdated += subSelector.processSelectedKeys(updateCount, action);
            for (SelectThread t: threads) {//如果还有其他线程,都要累加起来
                numKeysUpdated += t.subSelector.processSelectedKeys(updateCount, action);
            }
            return numKeysUpdated;
        }

processSelectedKeys(updateCount, action)

可以看到,读写异常三个集合的事件都处理了:

       private int processSelectedKeys(long updateCount, Consumer<SelectionKey> action) {
                int numKeysUpdated = 0;
                numKeysUpdated += processFDSet(updateCount, action, readFds,
                                               Net.POLLIN,
                                               false);
                numKeysUpdated += processFDSet(updateCount, action, writeFds,
                                               Net.POLLCONN |
                                               Net.POLLOUT,
                                               false);
                numKeysUpdated += processFDSet(updateCount, action, exceptFds,
                                               Net.POLLIN |
                                               Net.POLLCONN |
                                               Net.POLLOUT,
                                               true);
                return numKeysUpdated;
            }
processFDSet

主要的就是下面的处理,会把放进去的集合遍历一遍,取出相应的事件key,进行准备处理,然后把数量+1,返回:

     private int processFDSet(long updateCount,
                                     Consumer<SelectionKey> action,
                                     int[] fds, int rOps,
                                     boolean isExceptFds)
            {
                int numKeysUpdated = 0;
                for (int i = 1; i <= fds[0]; i++) {//遍历
                    int desc = fds[i];
                    ...
                    MapEntry me = fdMap.get(desc);
                    ...
                    SelectionKeyImpl sk = me.ski;//取出key
    				...
                    int updated = processReadyEvents(rOps, sk, action);//处理准备事件
                    if (updated > 0 && me.updateCount != updateCount) {
                        me.updateCount = updateCount;//更新数量
                        numKeysUpdated++;
                    }
                }
                return numKeysUpdated;
            }
processReadyEvents

这个就是检查触发的事件是否是感兴趣的事件,是就会加入selectedKeys,这个也是我们后面要去遍历的集合:

    protected final int processReadyEvents(int rOps,
                                               SelectionKeyImpl ski,
                                               Consumer<SelectionKey> action) {
            if (action != null) {
                ski.translateAndSetReadyOps(rOps);
                if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
                    action.accept(ski);
                    ensureOpen();
                    return 1;
                }
            } else {
                assert Thread.holdsLock(publicSelectedKeys);
                if (selectedKeys.contains(ski)) {
                    if (ski.translateAndUpdateReadyOps(rOps)) {
                        return 1;
                    }
                } else {
                    ski.translateAndSetReadyOps(rOps);
                    if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
                        selectedKeys.add(ski);
                        return 1;
                    }
                }
            }
            return 0;
        }

至此select的返回也处理完了。
然后我们就要进行selectedKeys的遍历了,然后获取相应的key做出相应的处理。

Linux选择器

doSelect()

看看Linux的这个是怎么做的,在EPollSelectorImpl.java中:

      @Override
        protected int doSelect(Consumer<SelectionKey> action, long timeout)
            throws IOException
        {
            assert Thread.holdsLock(this);
    
            // epoll_wait timeout is int
            int to = (int) Math.min(timeout, Integer.MAX_VALUE);
            boolean blocking = (to != 0);
            boolean timedPoll = (to > 0);
    
            int numEntries;
            processUpdateQueue();
            processDeregisterQueue();
            try {
                begin(blocking);
    
                do {
                    long startTime = timedPoll ? System.nanoTime() : 0;
                    numEntries = EPoll.wait(epfd, pollArrayAddress, NUM_EPOLLEVENTS, to);
                    if (numEntries == IOStatus.INTERRUPTED && timedPoll) {
                        // timed poll interrupted so need to adjust timeout
                        long adjust = System.nanoTime() - startTime;
                        to -= TimeUnit.MILLISECONDS.convert(adjust, TimeUnit.NANOSECONDS);
                        if (to <= 0) {
                            // timeout expired so no retry
                            numEntries = 0;
                        }
                    }
                } while (numEntries == IOStatus.INTERRUPTED);
                assert IOStatus.check(numEntries);
    
            } finally {
                end(blocking);
            }
            processDeregisterQueue();
            return processEvents(numEntries, action);
        }

其实可以看到,流程大致差不多,只是用了EPoll.wait(epfd, pollArrayAddress, NUM_EPOLLEVENTS, to);,最后处理是processEventsEPoll.wait前面有讲过,就不多啰嗦了,他会把事件和对应的文件描述符都放进一个集合里都返回,直接处理就好。

processUpdateQueue

主要用了EPoll.ctl,对红黑树进行操作,可以对结点进行EPOLL_CTL_ADD增加,EPOLL_CTL_MOD修改,EPOLL_CTL_DEL删除等操作:

      /**
         * Process changes to the interest ops.
         */
        private void processUpdateQueue() {
            assert Thread.holdsLock(this);
    
            synchronized (updateLock) {
                SelectionKeyImpl ski;
                while ((ski = updateKeys.pollFirst()) != null) {
                    if (ski.isValid()) {
                        int fd = ski.getFDVal();
                        // add to fdToKey if needed
                        SelectionKeyImpl previous = fdToKey.putIfAbsent(fd, ski);
                        assert (previous == null) || (previous == ski);
    
                        int newEvents = ski.translateInterestOps();
                        int registeredEvents = ski.registeredEvents();
                        if (newEvents != registeredEvents) {
                            if (newEvents == 0) {
                                // remove from epoll
                                EPoll.ctl(epfd, EPOLL_CTL_DEL, fd, 0);
                            } else {
                                if (registeredEvents == 0) {
                                    // add to epoll
                                    EPoll.ctl(epfd, EPOLL_CTL_ADD, fd, newEvents);
                                } else {
                                    // modify events
                                    EPoll.ctl(epfd, EPOLL_CTL_MOD, fd, newEvents);
                                }
                            }
                            ski.registeredEvents(newEvents);
                        }
                    }
                }
            }
        }

processEvents

事件处理不用像windows那样分三个集合来处理,只需要一个就可以了:

    private int processEvents(int numEntries, Consumer<SelectionKey> action)
            throws IOException
        {
            assert Thread.holdsLock(this);
    
            boolean interrupted = false;
            int numKeysUpdated = 0;
            for (int i=0; i<numEntries; i++) {
                long event = EPoll.getEvent(pollArrayAddress, i);
                int fd = EPoll.getDescriptor(event);
                if (fd == fd0) {
                    interrupted = true;
                } else {
                    SelectionKeyImpl ski = fdToKey.get(fd);
                    if (ski != null) {
                        int rOps = EPoll.getEvents(event);
                        numKeysUpdated += processReadyEvents(rOps, ski, action);
                    }
                }
            }
    
            if (interrupted) {
                clearInterrupt();
            }
    
            return numKeysUpdated;
        }

总结

本篇大致介绍了通道注册,选择器选择返回等流程,但是很多细节没讲到,这些等把这个流程搞明白了可以慢慢看,先有个大致的框架流程。现在我们知道注册的时候其实是生成SelectionKeyImpl,然后把相关通道的文件描述符和感兴趣事件添加到底层数组里,然后底层在select的时候会拿这些去等待事件,最后返回回来,然后去遍历集合,把对应的时间添加到selectedKeys中,之后业务逻辑再进行处理。windows处理起来比较麻烦,Linux处理起来就简便多了。

好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。

阅读全文