《Java 源码分析》:Java NIO 之 ServerSocketChannel
在上两篇博文中,主要从源码的角度粗略的介绍了Selector.open()和selector.select()方法的内部实现。
由于Selector和ServerSocketChannel、SocketChannel配合使用,因此,有必要了解ServerSocketChannel、SocketChannel的内部实现。本篇博文主要来看下ServerSocketChannel。
关于ServerSocketChannel类,我们主要从以下几个方法为切入点进行跟踪了解。
1、ServerSocketChannel ssc = ServerSocketChannel.open();
2、ssc.register(selector,SelectionKey.interestOps);
下面就从第一点开始
1、解析ServerSocketChannel.open()
ServerSocketChannel类中open()方法的源码如下:
public static ServerSocketChannel open() throws IOException {
return SelectorProvider.provider().openServerSocketChannel();
}
函数功能:打开一个ServerSocketChannel;
来分析ServerSocketChannel.open()方法中唯一的一行代码:SelectorProvider.provider().openServerSocketChannel()
在关于Selector中的这篇博文中我们分析过,SelectorProvider.provider()方法在windows平台下返回的是SelectorProvider 的实现类 WindowsSelectorProvider类的实例。
其中WindowsSelectorProvider、SelectorProvider类他们的之间的继承关系如下:
- WindowsSelectorProvider类的直接父类为:SelectorProviderImpl
- SelectorProviderImpl 的直接父类是 SelectorProvider。
因此SelectorProvider.provider().openServerSocketChannel()就是等效于:windowsSelectorProvider.openServerSocketChannel();而 openServerSocketChannel()方法并不是在 WindowsSelectorProvider 类中实现的,而是在其直接父类中SelectorProviderImpl类中实现的。
--------- SelectorProviderImpl --------------
public ServerSocketChannel openServerSocketChannel() throws IOException {
return new ServerSocketChannelImpl(this);
}
看见这些代码是不是和Selector.open()基本类似。即ServerSocketChannel.open()方法实际上是产生了一个子类ServerSocketChannelImpl的对象实例。
既然如下,接下来来看下这个子类ServerSocketChannelImpl。
--------- ServerSocketChannelImpl.java --------
class ServerSocketChannelImpl
extends ServerSocketChannel
implements SelChImpl
构造函数
// Our file descriptor
private final FileDescriptor fd;
// fd value needed for dev/poll. This value will remain valid
// even after the value in the file descriptor object has been set to -1
private int fdVal;
ServerSocketChannelImpl(SelectorProvider sp) throws IOException {
super(sp);
this.fd = Net.serverSocket(true);
this.fdVal = IOUtil.fdVal(fd);
this.state = ST_INUSE;
}
通过以上的跟踪我们可以得到如下的结果:
ServerSocketChannel ssc = ServerSocketChannel.open();中 ssc实际指向的是其子类(ServerSocketChannelImpl)对象。
要说明的是:ServerSocketChannel ssc = ServerSocketChannel.open()创建的这个新的Channel中的Socket是最初的,必须对这个Socket通过bind方法绑定指定的地址之后才能接收连接。因此,像如下的代码我们就比较常见:
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//serverSocketChannel监听指定端口
serverSocketChannel.socket().bind(new InetSocketAddress(port));
既然是跟踪源码,自己也就把下面这行代码的背后逻辑了解了下:
//监听指定的端口号
serverSocketChannel.socket().bind(new InetSocketAddress(9999));
ServerSocketChannelImpl类中 socket()方法的代码如下:
public ServerSocket socket() {
synchronized (stateLock) {//是一个状态所
if (socket == null)
socket = ServerSocketAdaptor.create(this);
return socket;
}
}
此方法返回的是一个ServerSocket对象,其中利用同步保证了socket是一个单例。
因此:serverSocketChannel.socket().bind(new InetSocketAddress(9999));
实现的是将 ServerSocketChannel 中的 ServerSocket绑定到指定的IP地址和端口上。(从这里可以看到,ServerSocketChannel里面其实是有一个ServerSocket对象的,这里就可以理解为什么使用ServerSocketChannel和SocketChannel编写服务器端和客户端的逻辑基本与ServerSocket和Socket一致的原因)
以上就是关于 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();代码的一个内部解析。下面来看 channel.register(//参数…)的内部实现。
接着来看下 ServerSocketChannelImpl类中的 accept()方法,代码如下:
解析channel.register(Selector sel, int ops,
Object att)
从源码的角度来看下channel.register(Selector sel, int ops,
Object att)方法的内部实现,register方法是在ServerSocketChannel的父类AbstractSelectableChannel中实现的
此方法的代码如下:
public final SelectionKey register(Selector sel, int ops,
Object att)
throws ClosedChannelException
{
synchronized (regLock) {//注册需要加锁
if (!isOpen()) //检查Channel是否打开
throw new ClosedChannelException();
if ((ops & ~validOps()) != 0) //参数的有效性检查
throw new IllegalArgumentException();
if (blocking) //
throw new IllegalBlockingModeException();
SelectionKey k = findKey(sel); //得到当前通道在指定Selector上的SelectionKey
/*
如果k不为null,则说明此通道已经在Selector上注册过了,则直接将指定的ops添加进SelectionKey中即可。
如果k为null,则说明此通道还没有在Selector上注册,则需要先进行注册,然后添加SelectionKey。
*/
if (k != null) {
k.interestOps(ops);
k.attach(att);
}
if (k == null) {
// New registration
synchronized (keyLock) {
if (!isOpen())
throw new ClosedChannelException();
k = ((AbstractSelector)sel).register(this, ops, att);
addKey(k);
}
}
return k;
}
}
//函数功能:返回当前定义的通道所支持的操作集合。对于ServerSocketChannel仅仅支持"新的连接",即返回SelectionKey.OP_ACCEPT。
public final int validOps() {
return SelectionKey.OP_ACCEPT;
}
---------- AbstractSelectableChannel类 ----------
//函数功能:返回当前通道在指定的选择器Selector的SelectionKey
private SelectionKey findKey(Selector sel) {
synchronized (keyLock) {
if (keys == null)
return null;
for (int i = 0; i < keys.length; i++)
if ((keys[i] != null) && (keys[i].selector() == sel))
return keys[i];
return null;
}
}
channel.register(Selector sel, int ops, Object att)函数功能:将这个通道channel注册到指定的selector中,返回一个SelectionKey对象实例。
register这个方法在实现代码上的逻辑有以下四点:
1、首先检查通道channel是否是打开的,如果不是打开的,则抛异常,如果是打开的,则进行 2。
2、检查指定的interest集合是否是有效的。如果没效,则抛异常。否则进行 3。这里要特别强调一下:对于ServerSocketChannel仅仅支持”新的连接”,因此interest集合ops满足ops&~sectionKey.OP_ACCEPT!=0
,即对于ServerSocketChannel注册到Selector中时的事件只能包括SelectionKey.OP_ACCEPT。
3、对通道进行了阻塞模式的检查,如果不是阻塞模式,则抛异常,否则进行4.
对于第3点发现了一个比较有意思的东西:在下面这篇博文(http://ifeve.com/selectors/) 中介绍Selector时有如下一句话:与Selector一起使用时,Channel必须处于非阻塞模式下。这意味着不能将FileChannel与Selector一起使用,因为FileChannel不能切换到非阻塞模式。而套接字通道都可以。
当时看到这篇博文的时候还不太理解,在看到register()方法内部源码有这样一个检查时,立即就懂了。
if (blocking) //
throw new IllegalBlockingModeException();
并且我们在写代码时都熟悉这样一行代码:channel.configureBlocking(false)
;即在和selector使用之前,我们都将通道设置为非阻塞性的。
--------- AbstractSelectableChannel类中的 configureBlocking()方法的代码如下:
/
*
函数功能:调整通道的阻塞模式
如果被指定的阻塞模式与当前阻塞模式不同,则会调用implConfigureBlocking方法来改变阻塞模式。
在改变这一模式之前会先持有一些锁。
*
/
public final SelectableChannel configureBlocking(boolean block)
throws IOException
{
synchronized (regLock) {
if (!isOpen())
throw new ClosedChannelException();
if (blocking == block)
return this;
if (block && haveValidKeys())
throw new IllegalBlockingModeException();
implConfigureBlocking(block);
blocking = block;
}
return this;
}
AbstractSelectableChannel类 中有一个 boolean类型变量来表示此通道是阻塞性的还是非阻塞性的。默认值为 true。函数的作用就是设置AbstractSelectableChannel类中的 blocking变量设置为false。
4、得到当前通道在指定Selector上的SelectionKey,假设结果用k表示。下面对k是否为null有不同的处理。如果k不为null,则说明此通道channel已经在Selector上注册过了,则直接将指定的ops添加进SelectionKey中即可。如果k为null,则说明此通道还没有在Selector上注册,则需要先进行注册,然后为其对应的SelectionKey设置给定值ops。
上面的4点内容就是register方法内部实现过程的大体逻辑。这里有必要介绍下ServerSocketChannel中的register方法中的这行代码:
k = ((AbstractSelector)sel).register(this, ops, att);
这行代码中的sel,实际上一个 WindowsSelectorImpl对象。下面我们看下这个类中的 WindowsSelectorImpl类中的 register方法(register方法在SelectorImpl类中实现的)
protected final SelectionKey register(AbstractSelectableChannel ch,
int ops,
Object attachment)
{
if (!(ch instanceof SelChImpl)) //实例有效性检查
throw new IllegalSelectorException();
SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
k.attach(attachment);
synchronized (publicKeys) {
implRegister(k);
}
k.interestOps(ops);
return k;
}
这个方法是Selector与 ServerSocketChannel产生联系的一个重点。下面来一行一行的分析上面方法中的代码
1、SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
这行代码的功能是:根据ServerSocketChannel对象和Selector对象实例来构造一个SelectionKey对象。
我们可以看下SelectionKeyImpl类。
final SelChImpl channel; // package-private
final SelectorImpl selector; // package-private
// Index for a pollfd array in Selector that this key is registered with
private int index;
private volatile int interestOps;
private int readyOps;
SelectionKeyImpl(SelChImpl ch, SelectorImpl sel) {
channel = ch;
selector = sel;
}
因此,SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);
通过这行代码我们就得到了一个SelectionKey对象实例。
2、k.attach(attachment);
这行代码的作用就是将SelectionKey对象中的attachment设置为指定值。
SelectionKey类中attach方法代码如下:
--------SelectionKey类中-----------
public final Object attach(Object ob) {
return attachmentUpdater.getAndSet(this, ob);
}
private static final AtomicReferenceFieldUpdater<SelectionKey,Object>
attachmentUpdater = AtomicReferenceFieldUpdater.newUpdater(
SelectionKey.class, Object.class, "attachment"
);
3、implRegister(k);
,这是WindowsSelectorImpl类中的一个方法。方法的具体代码如下:
private SelectionKeyImpl[] channelArray = new SelectionKeyImpl[INIT_CAP];// INIT_CAP = 8
protected void implRegister(SelectionKeyImpl ski) {
//持有关闭锁,即关闭这个Selector时需要持有该锁,这里就是为了防止正在register时此通道被关闭,因此也需要持有该锁。
synchronized (closeLock) {
if (pollWrapper == null)
throw new ClosedSelectorException();
growIfNeeded();//如果需要则扩容
//将存储了Channel的SelectionKey保存在Selector中,并将保存的位置totalChannels保存在SelectionKey中。
channelArray[totalChannels] = ski;
ski.setIndex(totalChannels);
fdMap.put(ski);
keys.add(ski); //将ski存储在Selector中的Set<SelectionKey> 中
pollWrapper.addEntry(totalChannels, ski);//功能:将ski的文件 标识符存储在指定的物理地址中
totalChannels++;
}
}
上面方法中的pollWrapper.addEntry(totalChannels, ski);
这行代码是一个关键,即将注册到Selector中的SelectionKey(代表了一个ServerSocketChannel)中保存进去了。
也来看下PollArrayWrapper类的addEntry方法。
PollArrayWrapper类
// Prepare another pollfd struct for use.
void addEntry(int index, SelectionKeyImpl ski) {
putDescriptor(index, ski.channel.getFDVal());
}
// Access methods for fd structures
void putDescriptor(int i, int fd) {
pollArray.putInt(SIZE_POLLFD * i + FD_OFFSET, fd);
}
对象 pollArray是一个 AllocatedNativeObject 对象,在 PollArrayWrapper类的构造函数中进行了初始化
其中 PollArrayWrapper类 中的两个常量如下:
private static final short FD_OFFSET = 0; // fd offset in pollfd
private static final short EVENT_OFFSET = 4; // events offset in pollfd
AllocatedNativeObject类中的putInt方法(在其父类 NativeObject中)的代码如下:
final void putInt(int offset, int value) {
unsafe.putInt(offset + address, value);
}
putInt(int offset, int value)函数功能:将文件标识符直接写入到指定的物理地址中。
以上就是关于((AbstractSelector)sel).register(this, ops, att);中的register方法的详细分析。也就分析完了channel.register(selector,ops)的整个实现逻辑
从以上我们可以得到一个结论:只要我们使用channel.register(selector,ops)语句将channel注册到指定的selector上,实际上就是保存到了pollWrapper中,而selector.select()方法实现的逻辑就是遍历这个pollWrapper,看那个通道已经准备好。
小结
分析了Selector.open()、selector.select()和channel.register(selector,ops)之后,对Java NIO的机制的原理稍微又更清晰了一点。但是还有待于继续揣摩理解。
之后也会添加一篇关于SelectionKey的相关介绍。