NioMessageUnsafe
这个是专门来处理客户端连接的unsafe
。
read读客户端连接
上篇分析了可调节的接收缓冲区,这篇就来看看他是怎么用的,所以就要看NioMessageUnsafe
和NioByteUnsafe
的read
,我们先介绍NioMessageUnsafe
的read
。这个是用来接收客户端连接的,读取的是客户端通道。allocHandle
的操作上篇文章里都讲了,就不多说了。这里是把所有的客户端连接接受完了,然后传递每一个连接的读事件。
@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();//获取通道的配置
final ChannelPipeline pipeline = pipeline();//获取管道
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();//获取接受缓冲区处理器
allocHandle.reset(config);//重置配置参数
boolean closed = false;
Throwable exception = null;
try {
try {
do {
int localRead = doReadMessages(readBuf);//读取消息到readBuf中
if (localRead == 0) {//没有消息
break;
}
if (localRead < 0) {//关闭是-1
closed = true;
break;
}
allocHandle.incMessagesRead(localRead);//读取消息数+1
} while (allocHandle.continueReading());
} catch (Throwable t) {
exception = t;
}
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
pipeline.fireChannelRead(readBuf.get(i));//传播读事件,传递的是客户端通道
}
readBuf.clear();
allocHandle.readComplete();
pipeline.fireChannelReadComplete();//传播读完成事件
if (exception != null) {
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}
if (closed) {
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
}
}
} finally {
if (!readPending && !config.isAutoRead()) {//不自动读了,就删除读事件监听,一般读完成后,都设置成自动读的
removeReadOp();
}
}
}
NioServerSocketChannel的doReadMessages
可以看到这里准备去接受一个客户端。
内部其实就是用ServerSocketChannel
去接受一个SocketChannel
啦。
然后放进一个list
中, 返回1
。如果没接受到,那就返回0
。后面就开始把这个通道传递下去了:
NioByteUnsafe
这个是专门来处理客户端读数据的。
read读客户端数据
其实逻辑和读客户端连接差不多,只不过这次是需要接收缓冲区分配器分配缓冲区来接受读的信息。然后接受一次传递一次读事件。这里为什么要读一次传一次,而不是全部读完再传呢,我想可能是因为这样读一部分处理一分部分,处理完了就释放内存,可以提高吞吐量,而且也不至于数据有积压,可能造成内存溢出。
@Override
public final void read() {
final ChannelConfig config = config();
if (shouldBreakReadReady(config)) {
clearReadPending();
return;
}
final ChannelPipeline pipeline = pipeline();
final ByteBufAllocator allocator = config.getAllocator();
final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
allocHandle.reset(config);
ByteBuf byteBuf = null;
boolean close = false;
try {
do {
byteBuf = allocHandle.allocate(allocator);//分配缓冲区
allocHandle.lastBytesRead(doReadBytes(byteBuf));//读取通道内数据到缓冲区,如果是客户端断开,读取数据个数是-1
if (allocHandle.lastBytesRead() <= 0) {//如果没读到数据0 或者客户端关闭-1
// nothing was read. release the buffer.
byteBuf.release();
byteBuf = null;
close = allocHandle.lastBytesRead() < 0;//-1
if (close) {//把通道关闭,也就是没发数据来
// There is nothing left to read as we received an EOF.
readPending = false;
}
break;
}
allocHandle.incMessagesRead(1);//增加读的消息数
readPending = false;
pipeline.fireChannelRead(byteBuf);//管道里传递消息
byteBuf = null;
} while (allocHandle.continueReading());
allocHandle.readComplete();
pipeline.fireChannelReadComplete();//读取完成
if (close) {
closeOnRead(pipeline);
}
} catch (Throwable t) {
handleReadException(pipeline, byteBuf, t, close, allocHandle);
} finally {
if (!readPending && !config.isAutoRead()) {//如果还要继续读的话,就不会删除读监听
removeReadOp();
}
}
}
handleReadException处理读异常
如果出现了读的异常,因为有缓冲区申请在那边,如果缓冲区有数据,那就先把数据传递下去,否则就直接释放,然后还是按普通逻辑去统计,传递读完成事件,最后传递异常事件。
private void handleReadException(ChannelPipeline pipeline, ByteBuf byteBuf, Throwable cause, boolean close,
RecvByteBufAllocator.Handle allocHandle) {
if (byteBuf != null) {
if (byteBuf.isReadable()) {//如果有可读的话,把传递消息
readPending = false;
pipeline.fireChannelRead(byteBuf);
} else {
byteBuf.release();
}
}
allocHandle.readComplete();
pipeline.fireChannelReadComplete();//读完成
pipeline.fireExceptionCaught(cause);//传递异常
if (close || cause instanceof IOException) {//如果是IO异常
closeOnRead(pipeline);//传递自定义事件
}
}
closeOnRead读关闭(半关闭)
这里可能要涉及一个半关闭
的概念,简单点就是说我们客户端和服务端的socket
其实都有两个缓冲区
,即读缓冲区和写缓冲区
,如果TCP
要分手了,客户端就会先发送关闭请求,然后把写缓冲区关闭了,就不写了,但是这个时候他是可以读客户端的消息的,即读缓冲区还没关,所以叫做半关闭。所以这里就是判断是否是把读缓冲区关了,如果关了,就直接传递自定义消息,否则就判断是否配置了半关闭,是的话就进行读关闭,传递自定义消息,否则就关闭通道了。
private void closeOnRead(ChannelPipeline pipeline) {
if (!isInputShutdown0()) {//单方关闭输出流,但是可以接受输入,即半关闭
if (isAllowHalfClosure(config())) {//是否配置了半关闭
shutdownInput();//关闭输入流
pipeline.fireUserEventTriggered(ChannelInputShutdownEvent.INSTANCE);
} else {
close(voidPromise());
}
} else {
inputClosedSeenErrorOnRead = true;
pipeline.fireUserEventTriggered(ChannelInputShutdownReadComplete.INSTANCE);//传递自定义事件
}
}
TCP关闭简单举例
其实简单点来说就是,2
个人打电话,A
说,我说完了( A
写关闭 ),B
还有话的话就说,如果也说完了,就跟A
说,我也说完了( B
写关闭 ).A收到后,说好的,再见( A
读关闭 ),B
听到后说,再见( B
读关闭 )。至此,双方的读写都关闭了。不知道我这么描述对不对哈哈。
NioSocketChannelUnsafe
这个是NioSocketChannel
的unsafe
。主要是prepareToClose方法的覆盖,取消注册事件,返回全局唯一的GlobalEventExecutor
。
private final class NioSocketChannelUnsafe extends NioByteUnsafe {
@Override
protected Executor prepareToClose() {
try {
if (javaChannel().isOpen() && config().getSoLinger() > 0) {
doDeregister();
return GlobalEventExecutor.INSTANCE;
}
} catch (Throwable ignore) {
}
return null;
}
}
至此unsafe
基本都了解了,但是具体流程怎么工作的,还是要自己debug
一下。
好了,今天就到这里了,希望对学习理解有帮助,大神看见勿喷,仅为自己的学习理解,能力有限,请多包涵。