选择器注册
上一篇讲的基本都是JNI
方面的多,这篇我们就讲Java
层的,JNI
的基本能了解就行,如果想深挖自己再百度学习好了。我们讲比较重要的注册过程,内部到底是怎么注册,select
返回的时候又是怎么处理的,先讲windows
的。
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
我们以serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
为例,用来监听客户端来的建立请求,来看看里面干了什么。最后是调用了java.nio.channels.spi.AbstractSelectableChannel的register
:
public final SelectionKey register(Selector sel, int ops, Object att)
throws ClosedChannelException
{
if ((ops & ~validOps()) != 0)//验证参数
throw new IllegalArgumentException();
if (!isOpen())//验证通道否打开
throw new ClosedChannelException();
synchronized (regLock) {//注册锁
if (isBlocking())//判断是否非阻塞模式
throw new IllegalBlockingModeException();
synchronized (keyLock) {//key锁
// re-check if channel has been closed
if (!isOpen())//在此验证通道否打开
throw new ClosedChannelException();
SelectionKey k = findKey(sel);//是否有注册过key
if (k != null) {//有的话就设置下感兴趣事件和附件
k.attach(att);
k.interestOps(ops);
} else {
// New registration 否则就重新注册一个key到选择器上
k = ((AbstractSelector)sel).register(this, ops, att);//没有就注册下
addKey(k);//保存key,便于通道关闭的时候取消注册
}
return k;
}
}
}
findKey(Selector sel)
获得当前通道中的key
的选择器与传入选择器相同的key
:
private SelectionKey findKey(Selector sel) {
assert Thread.holdsLock(keyLock);
if (keys == null)
return null;
for (int i = 0; i < keys.length; i++)
if ((keys[i] != null) && (keys[i].selector() == sel))//存在且选择器相同
return keys[i];
return null;
}
register(AbstractSelectableChannel ch, int ops, Object attachment)
选择器注册key
,sun.nio.ch.SelectorImpl
的register
:
@Override
protected final SelectionKey register(AbstractSelectableChannel ch,
int ops,
Object attachment)
{
if (!(ch instanceof SelChImpl))
throw new IllegalSelectorException();
SelectionKeyImpl k = new SelectionKeyImpl((SelChImpl)ch, this);//创建一个实现类
k.attach(attachment);
implRegister(k);//注册进选择器队列
keys.add(k);//保存key
try {
k.interestOps(ops);//设置感兴趣事件
} catch (ClosedSelectorException e) {
assert ch.keyFor(this) == null;
keys.remove(k);
k.cancel();
throw e;
}
return k;
}
interestOps(int ops)
设置key的感兴趣事件sun.nio.ch.SelectionKeyImpl
的interestOps(int)
:
@Override
public SelectionKey interestOps(int ops) {
ensureValid();
if ((ops & ~channel().validOps()) != 0)
throw new IllegalArgumentException();
int oldOps = (int) INTERESTOPS.getAndSet(this, ops);
if (ops != oldOps) {
selector.setEventOps(this);//如果事件和以前的不一样,就添加到选择器更新队列中
}
return this;
}
setEventOps(SelectionKeyImpl ski)
添加到选择器更新队列中sun.nio.ch.WindowsSelectorImpl
的setEventOps
:
@Override
public void setEventOps(SelectionKeyImpl ski) {
ensureOpen();
synchronized (updateLock) {
updateKeys.addLast(ski);//添加更新的key
}
}
基本注册流程走完了,示意图大致是这样:
windows选择器
doSelect()
直接到最后关键的地方sun.nio.ch.WindowsSelectorImpl
的doSelect
,我们来看下主要的部分:
@Override
protected int doSelect(Consumer<SelectionKey> action, long timeout)
throws IOException
{
...
processUpdateQueue();//根据key更新队列
processDeregisterQueue();//如果key取消了,就要从队列中注销
...
subSelector.poll();
...
processDeregisterQueue();//再次注销队列
int updated = updateSelectedKeys(action);//更新并处理key
...
return updated;
}
processUpdateQueue()
这个就是处理前面注册进去的key
,把新的key信息添加到底层的文件描述符数组中,然后再设置感兴趣事件:
private void processUpdateQueue() {
assert Thread.holdsLock(this);
synchronized (updateLock) {
SelectionKeyImpl ski;
// new registrations 新的注册key
while ((ski = newKeys.pollFirst()) != null) {
if (ski.isValid()) {
growIfNeeded();
channelArray[totalChannels] = ski;//增加新的key
ski.setIndex(totalChannels);//设置key的索引
pollWrapper.putEntry(totalChannels, ski);//设置文件描述符和相应的事件到文件描述符数组中
totalChannels++;//通道数+1
MapEntry previous = fdMap.put(ski);//添加到map中
assert previous == null;
}
}
// changes to interest ops 更新感兴趣事件
while ((ski = updateKeys.pollFirst()) != null) {
int events = ski.translateInterestOps();
int fd = ski.getFDVal();
if (ski.isValid() && fdMap.containsKey(fd)) {
int index = ski.getIndex();
assert index >= 0 && index < totalChannels;
pollWrapper.putEventOps(index, events);
}
}
}
}
processDeregisterQueue()
把一些出现异常而取消的key
删除:
protected final void processDeregisterQueue() throws IOException {
assert Thread.holdsLock(this);
assert Thread.holdsLock(publicSelectedKeys);
Set<SelectionKey> cks = cancelledKeys();//获取已经取消的keys
synchronized (cks) {
if (!cks.isEmpty()) {
Iterator<SelectionKey> i = cks.iterator();
while (i.hasNext()) {
SelectionKeyImpl ski = (SelectionKeyImpl)i.next();
i.remove();
// remove the key from the selector 执行选择中注销操作
implDereg(ski);
selectedKeys.remove(ski);//删除要操作的key
keys.remove(ski);//从选择器key集合里删除
// remove from channel's key set 从通道中删除key
deregister(ski);
SelectableChannel ch = ski.channel();
if (!ch.isOpen() && !ch.isRegistered())
((SelChImpl)ch).kill();//关闭通道
}
}
}
}
subSelector.poll()
开始进行JNI
的select
轮询了,这个poll0
前面讲过,就不多说了:
private int poll() throws IOException{ // poll for the main thread
return poll0(pollWrapper.pollArrayAddress,
Math.min(totalChannels, MAX_SELECTABLE_FDS),
readFds, writeFds, exceptFds, timeout);
}
updateSelectedKeys(action)
这个就是返回之后的处理工作,看看有没有事件:
private int updateSelectedKeys(Consumer<SelectionKey> action) {
updateCount++;
int numKeysUpdated = 0;
numKeysUpdated += subSelector.processSelectedKeys(updateCount, action);
for (SelectThread t: threads) {//如果还有其他线程,都要累加起来
numKeysUpdated += t.subSelector.processSelectedKeys(updateCount, action);
}
return numKeysUpdated;
}
processSelectedKeys(updateCount, action)
可以看到,读写异常三个集合的事件都处理了:
private int processSelectedKeys(long updateCount, Consumer<SelectionKey> action) {
int numKeysUpdated = 0;
numKeysUpdated += processFDSet(updateCount, action, readFds,
Net.POLLIN,
false);
numKeysUpdated += processFDSet(updateCount, action, writeFds,
Net.POLLCONN |
Net.POLLOUT,
false);
numKeysUpdated += processFDSet(updateCount, action, exceptFds,
Net.POLLIN |
Net.POLLCONN |
Net.POLLOUT,
true);
return numKeysUpdated;
}
processFDSet
主要的就是下面的处理,会把放进去的集合遍历一遍,取出相应的事件key
,进行准备处理,然后把数量+1,返回:
private int processFDSet(long updateCount,
Consumer<SelectionKey> action,
int[] fds, int rOps,
boolean isExceptFds)
{
int numKeysUpdated = 0;
for (int i = 1; i <= fds[0]; i++) {//遍历
int desc = fds[i];
...
MapEntry me = fdMap.get(desc);
...
SelectionKeyImpl sk = me.ski;//取出key
...
int updated = processReadyEvents(rOps, sk, action);//处理准备事件
if (updated > 0 && me.updateCount != updateCount) {
me.updateCount = updateCount;//更新数量
numKeysUpdated++;
}
}
return numKeysUpdated;
}
processReadyEvents
这个就是检查触发的事件是否是感兴趣的事件,是就会加入selectedKeys
,这个也是我们后面要去遍历的集合:
protected final int processReadyEvents(int rOps,
SelectionKeyImpl ski,
Consumer<SelectionKey> action) {
if (action != null) {
ski.translateAndSetReadyOps(rOps);
if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
action.accept(ski);
ensureOpen();
return 1;
}
} else {
assert Thread.holdsLock(publicSelectedKeys);
if (selectedKeys.contains(ski)) {
if (ski.translateAndUpdateReadyOps(rOps)) {
return 1;
}
} else {
ski.translateAndSetReadyOps(rOps);
if ((ski.nioReadyOps() & ski.nioInterestOps()) != 0) {
selectedKeys.add(ski);
return 1;
}
}
}
return 0;
}
至此select
的返回也处理完了。
然后我们就要进行selectedKeys
的遍历了,然后获取相应的key
做出相应的处理。
Linux选择器
doSelect()
看看Linux
的这个是怎么做的,在EPollSelectorImpl.java
中:
@Override
protected int doSelect(Consumer<SelectionKey> action, long timeout)
throws IOException
{
assert Thread.holdsLock(this);
// epoll_wait timeout is int
int to = (int) Math.min(timeout, Integer.MAX_VALUE);
boolean blocking = (to != 0);
boolean timedPoll = (to > 0);
int numEntries;
processUpdateQueue();
processDeregisterQueue();
try {
begin(blocking);
do {
long startTime = timedPoll ? System.nanoTime() : 0;
numEntries = EPoll.wait(epfd, pollArrayAddress, NUM_EPOLLEVENTS, to);
if (numEntries == IOStatus.INTERRUPTED && timedPoll) {
// timed poll interrupted so need to adjust timeout
long adjust = System.nanoTime() - startTime;
to -= TimeUnit.MILLISECONDS.convert(adjust, TimeUnit.NANOSECONDS);
if (to <= 0) {
// timeout expired so no retry
numEntries = 0;
}
}
} while (numEntries == IOStatus.INTERRUPTED);
assert IOStatus.check(numEntries);
} finally {
end(blocking);
}
processDeregisterQueue();
return processEvents(numEntries, action);
}
其实可以看到,流程大致差不多,只是用了EPoll.wait(epfd, pollArrayAddress, NUM_EPOLLEVENTS, to);
,最后处理是processEvents
。EPoll.wait
前面有讲过,就不多啰嗦了,他会把事件和对应的文件描述符都放进一个集合里都返回,直接处理就好。
processUpdateQueue
主要用了EPoll.ctl
,对红黑树进行操作,可以对结点进行EPOLL_CTL_ADD
增加,EPOLL_CTL_MOD
修改,EPOLL_CTL_DEL
删除等操作:
/**
* Process changes to the interest ops.
*/
private void processUpdateQueue() {
assert Thread.holdsLock(this);
synchronized (updateLock) {
SelectionKeyImpl ski;
while ((ski = updateKeys.pollFirst()) != null) {
if (ski.isValid()) {
int fd = ski.getFDVal();
// add to fdToKey if needed
SelectionKeyImpl previous = fdToKey.putIfAbsent(fd, ski);
assert (previous == null) || (previous == ski);
int newEvents = ski.translateInterestOps();
int registeredEvents = ski.registeredEvents();
if (newEvents != registeredEvents) {
if (newEvents == 0) {
// remove from epoll
EPoll.ctl(epfd, EPOLL_CTL_DEL, fd, 0);
} else {
if (registeredEvents == 0) {
// add to epoll
EPoll.ctl(epfd, EPOLL_CTL_ADD, fd, newEvents);
} else {
// modify events
EPoll.ctl(epfd, EPOLL_CTL_MOD, fd, newEvents);
}
}
ski.registeredEvents(newEvents);
}
}
}
}
}
processEvents
事件处理不用像windows
那样分三个集合来处理,只需要一个就可以了:
private int processEvents(int numEntries, Consumer<SelectionKey> action)
throws IOException
{
assert Thread.holdsLock(this);
boolean interrupted = false;
int numKeysUpdated = 0;
for (int i=0; i<numEntries; i++) {
long event = EPoll.getEvent(pollArrayAddress, i);
int fd = EPoll.getDescriptor(event);
if (fd == fd0) {
interrupted = true;
} else {
SelectionKeyImpl ski = fdToKey.get(fd);
if (ski != null) {
int rOps = EPoll.getEvents(event);
numKeysUpdated += processReadyEvents(rOps, ski, action);
}
}
}
if (interrupted) {
clearInterrupt();
}
return numKeysUpdated;
}
总结
本篇大致介绍了通道注册,选择器选择返回等流程,但是很多细节没讲到,这些等把这个流程搞明白了可以慢慢看,先有个大致的框架流程。现在我们知道注册的时候其实是生成SelectionKeyImpl
,然后把相关通道的文件描述符和感兴趣事件添加到底层数组里,然后底层在select的时候会拿这些去等待事件,最后返回回来,然后去遍历集合,把对应的时间添加到selectedKeys
中,之后业务逻辑再进行处理。windows
处理起来比较麻烦,Linux
处理起来就简便多了。
好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。