2023-01-14  阅读(322)
原文作者: HelloWorld_EE 原文地址:https://blog.csdn.net/u010412719/category_6159934_2.html

《Java 源码分析》:Java NIO 之 ServerSocketChannel

在上两篇博文中,主要从源码的角度粗略的介绍了Selector.open()和selector.select()方法的内部实现。

由于Selector和ServerSocketChannel、SocketChannel配合使用,因此,有必要了解ServerSocketChannel、SocketChannel的内部实现。本篇博文主要来看下ServerSocketChannel。

关于ServerSocketChannel类,我们主要从以下几个方法为切入点进行跟踪了解。

1、ServerSocketChannel ssc = ServerSocketChannel.open();

2、ssc.register(selector,SelectionKey.interestOps);

下面就从第一点开始

1、解析ServerSocketChannel.open()

ServerSocketChannel类中open()方法的源码如下:

        public static ServerSocketChannel open() throws IOException {
            return SelectorProvider.provider().openServerSocketChannel();
        }

函数功能:打开一个ServerSocketChannel;

来分析ServerSocketChannel.open()方法中唯一的一行代码:SelectorProvider.provider().openServerSocketChannel()

在关于Selector中的这篇博文中我们分析过,SelectorProvider.provider()方法在windows平台下返回的是SelectorProvider 的实现类 WindowsSelectorProvider类的实例。

其中WindowsSelectorProvider、SelectorProvider类他们的之间的继承关系如下:

  • WindowsSelectorProvider类的直接父类为:SelectorProviderImpl
  • SelectorProviderImpl 的直接父类是 SelectorProvider。

因此SelectorProvider.provider().openServerSocketChannel()就是等效于:windowsSelectorProvider.openServerSocketChannel();而 openServerSocketChannel()方法并不是在 WindowsSelectorProvider 类中实现的,而是在其直接父类中SelectorProviderImpl类中实现的。

        --------- SelectorProviderImpl --------------
    
        public ServerSocketChannel openServerSocketChannel() throws IOException {
            return new ServerSocketChannelImpl(this);
        }

看见这些代码是不是和Selector.open()基本类似。即ServerSocketChannel.open()方法实际上是产生了一个子类ServerSocketChannelImpl的对象实例。

既然如下,接下来来看下这个子类ServerSocketChannelImpl。

    ---------  ServerSocketChannelImpl.java --------
    
    
        class ServerSocketChannelImpl
            extends ServerSocketChannel
            implements SelChImpl
    
        构造函数
        // Our file descriptor
        private final FileDescriptor fd;
    
        // fd value needed for dev/poll. This value will remain valid
        // even after the value in the file descriptor object has been set to -1
        private int fdVal;
    
        ServerSocketChannelImpl(SelectorProvider sp) throws IOException {
            super(sp);
            this.fd =  Net.serverSocket(true);
            this.fdVal = IOUtil.fdVal(fd);
            this.state = ST_INUSE;
        }

通过以上的跟踪我们可以得到如下的结果:

ServerSocketChannel ssc = ServerSocketChannel.open();中 ssc实际指向的是其子类(ServerSocketChannelImpl)对象。

要说明的是:ServerSocketChannel ssc = ServerSocketChannel.open()创建的这个新的Channel中的Socket是最初的,必须对这个Socket通过bind方法绑定指定的地址之后才能接收连接。因此,像如下的代码我们就比较常见:

                    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
                    //serverSocketChannel监听指定端口
                    serverSocketChannel.socket().bind(new InetSocketAddress(port));

既然是跟踪源码,自己也就把下面这行代码的背后逻辑了解了下:

            //监听指定的端口号
            serverSocketChannel.socket().bind(new InetSocketAddress(9999));

ServerSocketChannelImpl类中 socket()方法的代码如下:

        public ServerSocket socket() {
            synchronized (stateLock) {//是一个状态所
    
                if (socket == null)
                    socket = ServerSocketAdaptor.create(this);
                return socket;
            }
        }

此方法返回的是一个ServerSocket对象,其中利用同步保证了socket是一个单例。

因此:serverSocketChannel.socket().bind(new InetSocketAddress(9999));
实现的是将 ServerSocketChannel 中的 ServerSocket绑定到指定的IP地址和端口上。(从这里可以看到,ServerSocketChannel里面其实是有一个ServerSocket对象的,这里就可以理解为什么使用ServerSocketChannel和SocketChannel编写服务器端和客户端的逻辑基本与ServerSocket和Socket一致的原因)

以上就是关于 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();代码的一个内部解析。下面来看 channel.register(//参数…)的内部实现。

接着来看下 ServerSocketChannelImpl类中的 accept()方法,代码如下:

解析channel.register(Selector sel, int ops,
Object att)

从源码的角度来看下channel.register(Selector sel, int ops,
Object att)方法的内部实现,register方法是在ServerSocketChannel的父类AbstractSelectableChannel中实现的

此方法的代码如下:

        public final SelectionKey register(Selector sel, int ops,
                                           Object att)
            throws ClosedChannelException
        {
            synchronized (regLock) {//注册需要加锁
                if (!isOpen()) //检查Channel是否打开
                    throw new ClosedChannelException();
                if ((ops & ~validOps()) != 0) //参数的有效性检查
                    throw new IllegalArgumentException();
                if (blocking) //
                    throw new IllegalBlockingModeException();
                SelectionKey k = findKey(sel); //得到当前通道在指定Selector上的SelectionKey
                /*
                    如果k不为null,则说明此通道已经在Selector上注册过了,则直接将指定的ops添加进SelectionKey中即可。
                    如果k为null,则说明此通道还没有在Selector上注册,则需要先进行注册,然后添加SelectionKey。
                */
                if (k != null) {
                    k.interestOps(ops);
                    k.attach(att);
                }
                if (k == null) {
                    // New registration
                    synchronized (keyLock) {
                        if (!isOpen())
                            throw new ClosedChannelException();
                        k = ((AbstractSelector)sel).register(this, ops, att);
                        addKey(k);
                    }
                }
                return k;
            }
        }
        //函数功能:返回当前定义的通道所支持的操作集合。对于ServerSocketChannel仅仅支持"新的连接",即返回SelectionKey.OP_ACCEPT。
        public final int validOps() {
            return SelectionKey.OP_ACCEPT;
        }
    ---------- AbstractSelectableChannel类 ----------
        //函数功能:返回当前通道在指定的选择器Selector的SelectionKey
        private SelectionKey findKey(Selector sel) {
            synchronized (keyLock) {
                if (keys == null)
                    return null;
                for (int i = 0; i < keys.length; i++)
                    if ((keys[i] != null) && (keys[i].selector() == sel))
                        return keys[i];
                return null;
            }
        }

channel.register(Selector sel, int ops, Object att)函数功能:将这个通道channel注册到指定的selector中,返回一个SelectionKey对象实例。

register这个方法在实现代码上的逻辑有以下四点:

1、首先检查通道channel是否是打开的,如果不是打开的,则抛异常,如果是打开的,则进行 2。

2、检查指定的interest集合是否是有效的。如果没效,则抛异常。否则进行 3。这里要特别强调一下:对于ServerSocketChannel仅仅支持”新的连接”,因此interest集合ops满足ops&~sectionKey.OP_ACCEPT!=0,即对于ServerSocketChannel注册到Selector中时的事件只能包括SelectionKey.OP_ACCEPT。

3、对通道进行了阻塞模式的检查,如果不是阻塞模式,则抛异常,否则进行4.

对于第3点发现了一个比较有意思的东西:在下面这篇博文(http://ifeve.com/selectors/) 中介绍Selector时有如下一句话:与Selector一起使用时,Channel必须处于非阻塞模式下。这意味着不能将FileChannel与Selector一起使用,因为FileChannel不能切换到非阻塞模式。而套接字通道都可以。

当时看到这篇博文的时候还不太理解,在看到register()方法内部源码有这样一个检查时,立即就懂了。

            if (blocking) //
                throw new IllegalBlockingModeException();

并且我们在写代码时都熟悉这样一行代码:channel.configureBlocking(false);即在和selector使用之前,我们都将通道设置为非阻塞性的。

        ---------  AbstractSelectableChannel类中的 configureBlocking()方法的代码如下:
        /
        *
            函数功能:调整通道的阻塞模式
            如果被指定的阻塞模式与当前阻塞模式不同,则会调用implConfigureBlocking方法来改变阻塞模式。
            在改变这一模式之前会先持有一些锁。
        *
        /
        public final SelectableChannel configureBlocking(boolean block)
            throws IOException
        {
            synchronized (regLock) {
                if (!isOpen())
                    throw new ClosedChannelException();
                if (blocking == block)
                    return this;
                if (block && haveValidKeys())
                    throw new IllegalBlockingModeException();
                implConfigureBlocking(block);
                blocking = block;
            }
            return this;
        }

AbstractSelectableChannel类 中有一个 boolean类型变量来表示此通道是阻塞性的还是非阻塞性的。默认值为 true。函数的作用就是设置AbstractSelectableChannel类中的 blocking变量设置为false。

4、得到当前通道在指定Selector上的SelectionKey,假设结果用k表示。下面对k是否为null有不同的处理。如果k不为null,则说明此通道channel已经在Selector上注册过了,则直接将指定的ops添加进SelectionKey中即可。如果k为null,则说明此通道还没有在Selector上注册,则需要先进行注册,然后为其对应的SelectionKey设置给定值ops。

上面的4点内容就是register方法内部实现过程的大体逻辑。这里有必要介绍下ServerSocketChannel中的register方法中的这行代码:

    k = ((AbstractSelector)sel).register(this, ops, att);

这行代码中的sel,实际上一个 WindowsSelectorImpl对象。下面我们看下这个类中的 WindowsSelectorImpl类中的 register方法(register方法在SelectorImpl类中实现的)

        protected final SelectionKey register(AbstractSelectableChannel ch,
                                              int ops,
                                              Object attachment)
        {
            if (!(ch instanceof SelChImpl)) //实例有效性检查
                throw new IllegalSelectorException();
            SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
            k.attach(attachment);
            synchronized (publicKeys) {
                implRegister(k);
            }
            k.interestOps(ops);
            return k;
        }

这个方法是Selector与 ServerSocketChannel产生联系的一个重点。下面来一行一行的分析上面方法中的代码

1、SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);这行代码的功能是:根据ServerSocketChannel对象和Selector对象实例来构造一个SelectionKey对象。

我们可以看下SelectionKeyImpl类。

        final SelChImpl channel;                            // package-private
        final SelectorImpl selector;                        // package-private
    
        // Index for a pollfd array in Selector that this key is registered with
        private int index;
    
        private volatile int interestOps;
        private int readyOps;
    
        SelectionKeyImpl(SelChImpl ch, SelectorImpl sel) {
            channel = ch;
            selector = sel;
        }

因此,SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);通过这行代码我们就得到了一个SelectionKey对象实例。

2、k.attach(attachment);这行代码的作用就是将SelectionKey对象中的attachment设置为指定值。

SelectionKey类中attach方法代码如下:

        --------SelectionKey类中-----------
        public final Object attach(Object ob) {
            return attachmentUpdater.getAndSet(this, ob);
        }
        private static final AtomicReferenceFieldUpdater<SelectionKey,Object>
            attachmentUpdater = AtomicReferenceFieldUpdater.newUpdater(
                SelectionKey.class, Object.class, "attachment"
            );

3、implRegister(k);,这是WindowsSelectorImpl类中的一个方法。方法的具体代码如下:

    
    
        private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[INIT_CAP];// INIT_CAP = 8
    
        protected void implRegister(SelectionKeyImpl ski) { 
            //持有关闭锁,即关闭这个Selector时需要持有该锁,这里就是为了防止正在register时此通道被关闭,因此也需要持有该锁。
            synchronized (closeLock) {
                if (pollWrapper == null)
                    throw new ClosedSelectorException();
                growIfNeeded();//如果需要则扩容
                //将存储了Channel的SelectionKey保存在Selector中,并将保存的位置totalChannels保存在SelectionKey中。
                channelArray[totalChannels] = ski;
                ski.setIndex(totalChannels);
    
                fdMap.put(ski);
                keys.add(ski);  //将ski存储在Selector中的Set<SelectionKey> 中
                pollWrapper.addEntry(totalChannels, ski);//功能:将ski的文件 标识符存储在指定的物理地址中
                totalChannels++;
            }
        }

上面方法中的pollWrapper.addEntry(totalChannels, ski);这行代码是一个关键,即将注册到Selector中的SelectionKey(代表了一个ServerSocketChannel)中保存进去了。

也来看下PollArrayWrapper类的addEntry方法。

PollArrayWrapper类

        // Prepare another pollfd struct for use.
        void addEntry(int index, SelectionKeyImpl ski) {
            putDescriptor(index, ski.channel.getFDVal());
        }
    
        // Access methods for fd structures
        void putDescriptor(int i, int fd) {
            pollArray.putInt(SIZE_POLLFD * i + FD_OFFSET, fd);
        }

对象 pollArray是一个 AllocatedNativeObject 对象,在 PollArrayWrapper类的构造函数中进行了初始化
其中 PollArrayWrapper类 中的两个常量如下:

        private static final short FD_OFFSET     = 0; // fd offset in pollfd
        private static final short EVENT_OFFSET  = 4; // events offset in pollfd

AllocatedNativeObject类中的putInt方法(在其父类 NativeObject中)的代码如下:

       final void putInt(int offset, int value) {
            unsafe.putInt(offset + address, value);
        }

putInt(int offset, int value)函数功能:将文件标识符直接写入到指定的物理地址中。

以上就是关于((AbstractSelector)sel).register(this, ops, att);中的register方法的详细分析。也就分析完了channel.register(selector,ops)的整个实现逻辑

从以上我们可以得到一个结论:只要我们使用channel.register(selector,ops)语句将channel注册到指定的selector上,实际上就是保存到了pollWrapper中,而selector.select()方法实现的逻辑就是遍历这个pollWrapper,看那个通道已经准备好。

小结

分析了Selector.open()、selector.select()和channel.register(selector,ops)之后,对Java NIO的机制的原理稍微又更清晰了一点。但是还有待于继续揣摩理解。

之后也会添加一篇关于SelectionKey的相关介绍。

阅读全文
  • 点赞