2022-08-10  阅读(528)
原文作者:keep_trying_gogo 原文地址:https://blog.csdn.net/yjp198713/category_9271466.html

一、异步IO模型

异步IO则采用“订阅-通知”模式: 即应用程序向操作系统注册IO监听,然后继续做自己的事情。当操作系统发生IO事件,并且准备好数据后,在主动通知应用程序,触发相应的函数

202202131612535341.png

  • 和同步IO一样,异步IO也是由操作系统进行支持的。微软的windows系统提供了一种异步IO技术:IOCP(I/O CompletionPort,I/O完成端口);
  • Linux下由于没有这种异步IO技术,所以使用的是epoll(上文介绍过的一种多路复用IO技术的实现)对异步IO进行模拟。

二、JAVA AIO框架简析

202202131612542612.png

JAVA AIO框架在windows下使用windows IOCP技术,在Linux下使用epoll多路复用IO技术模拟异步IO,这个从JAVA AIO框架的部分类设计上就可以看出来。
例如框架中,在Windows下负责实现套接字通道的具体类是“sun.nio.ch.WindowsAsynchronousSocketChannelImpl”,其引用的IOCP类型文档注释如是:

               /**
               * Windows implementation of AsynchronousChannelGroup encapsulating an I/O
               * completion port.
               */

特别说明一下,请注意在上图中的“java.nio.channels.NetworkChannel”接口,这个接口同样被JAVA NIO框架实现了,如下图所示:

202202131612556943.png

代码示例

客户端和第二篇文章一下,本篇只给出服务端代码
服务端:

               package demo.com.test.io.aio;
            
               import java.net.InetSocketAddress;
               import java.nio.channels.AsynchronousChannelGroup;
               import java.nio.channels.AsynchronousServerSocketChannel;
               import java.util.concurrent.ExecutorService;
               import java.util.concurrent.Executors;
            
               public class AioSocketServer {
            
            
                   private static final Object waitObject = new Object();
            
                   /**
                    * @param args
                    * @throws Exception
                    */
                   public static void main(String[] args) throws Exception {
                       /*
                        * 对于使用的线程池技术,我一定要多说几句
                        * 1、Executors是线程池生成工具,通过这个工具我们可以很轻松的生成“固定大小的线程池”、“调度池”、“可伸缩线程数量的池”。具体请看API Doc
                        * 2、当然您也可以通过ThreadPoolExecutor直接生成池。
                        * 3、这个线程池是用来得到操作系统的“IO事件通知”的,不是用来进行“得到IO数据后的业务处理的”。要进行后者的操作,您可以再使用一个池(最好不要混用)
                        * 4、您也可以不使用线程池(不推荐),如果决定不使用线程池,直接AsynchronousServerSocketChannel.open()就行了。
                        * */
                       ExecutorService threadPool = Executors.newFixedThreadPool(20);
                       AsynchronousChannelGroup group = AsynchronousChannelGroup.withThreadPool(threadPool);
                       final AsynchronousServerSocketChannel serverSocket = AsynchronousServerSocketChannel.open(group);
            
                       //设置要监听的端口“0.0.0.0”代表本机所有IP设备
                       serverSocket.bind(new InetSocketAddress("0.0.0.0", 8083));
                       //为AsynchronousServerSocketChannel注册监听,注意只是为AsynchronousServerSocketChannel通道注册监听
                       //并不包括为 随后客户端和服务器 socketchannel通道注册的监听
                       serverSocket.accept(null, new ServerSocketChannelHandle(serverSocket));
            
                       //等待,以便观察现象(这个和要讲解的原理本身没有任何关系,只是为了保证守护线程不会退出)
                       synchronized(waitObject) {
                           waitObject.wait();
                       }
                   }
               }
            
               package demo.com.test.io.aio;
            
               import java.nio.ByteBuffer;
               import java.nio.channels.AsynchronousServerSocketChannel;
               import java.nio.channels.AsynchronousSocketChannel;
               import java.nio.channels.CompletionHandler;
            
               import org.apache.commons.logging.Log;
               import org.apache.commons.logging.LogFactory;
            
               /**
                * 这个处理器类,专门用来响应 ServerSocketChannel 的事件。
                * ServerSocketChannel只有一种事件:接受客户端的连接
                * @author keep_trying
                */
               public class ServerSocketChannelHandle implements CompletionHandler<AsynchronousSocketChannel, Void> {
                   /**
                    * 日志
                    */
                   private static final Log LOGGER = LogFactory.getLog(ServerSocketChannelHandle.class);
            
                   private AsynchronousServerSocketChannel serverSocketChannel;
            
                   /**
                    * @param serverSocketChannel
                    */
                   public ServerSocketChannelHandle(AsynchronousServerSocketChannel serverSocketChannel) {
                       this.serverSocketChannel = serverSocketChannel;
                   }
            
                   /**
                    * 注意,我们分别观察 this、socketChannel、attachment三个对象的id。
                    * 来观察不同客户端连接到达时,这三个对象的变化,以说明ServerSocketChannelHandle的监听模式
                    */
                   @Override
                   public void completed(AsynchronousSocketChannel socketChannel, Void attachment) {
                       ServerSocketChannelHandle.LOGGER.info("completed(AsynchronousSocketChannel result, ByteBuffer attachment)");
                       //每次都要重新注册监听(一次注册,一次响应),但是由于“文件状态标示符”是独享的,所以不需要担心有“漏掉的”事件
                       this.serverSocketChannel.accept(attachment, this);
            
                       //为这个新的socketChannel注册“read”事件,以便操作系统在收到数据并准备好后,主动通知应用程序
                       //在这里,由于我们要将这个客户端多次传输的数据累加起来一起处理,所以我们将一个stringbuffer对象作为一个“附件”依附在这个channel上
                       //
                       ByteBuffer readBuffer = ByteBuffer.allocate(2550);
                       socketChannel.read(readBuffer, new StringBuffer(), new SocketChannelReadHandle(socketChannel , readBuffer));
                   }
            
                   /* (non-Javadoc)
                    * @see java.nio.channels.CompletionHandler#failed(java.lang.Throwable, java.lang.Object)
                    */
                   @Override
                   public void failed(Throwable exc, Void attachment) {
                       ServerSocketChannelHandle.LOGGER.info("failed(Throwable exc, ByteBuffer attachment)");
                   }
               }
            
               package demo.com.test.io.aio;
            
               import java.io.IOException;
               import java.io.UnsupportedEncodingException;
               import java.net.URLEncoder;
               import java.nio.ByteBuffer;
               import java.nio.channels.AsynchronousSocketChannel;
               import java.nio.channels.CompletionHandler;
            
               import org.apache.commons.logging.Log;
               import org.apache.commons.logging.LogFactory;
            
               /**
                * 负责对每一个socketChannel的数据获取事件进行监听。<p>
                *
                * 重要的说明:一个socketchannel都会有一个独立工作的SocketChannelReadHandle对象(CompletionHandler接口的实现),
                * 其中又都将独享一个“文件状态标示”对象FileDescriptor、
                * 一个独立的由程序员定义的Buffer缓存(这里我们使用的是ByteBuffer)、
                * 所以不用担心在服务器端会出现“窜对象”这种情况,因为JAVA AIO框架已经帮您组织好了。<p>
                *
                * 但是最重要的,用于生成channel的对象:AsynchronousChannelProvider是单例模式,无论在哪组socketchannel,
                * 对是一个对象引用(但这没关系,因为您不会直接操作这个AsynchronousChannelProvider对象)。
                * @author keep_trying
                */
               public class SocketChannelReadHandle implements CompletionHandler<Integer, StringBuffer> {
                   /**
                    * 日志
                    */
                   private static final Log LOGGER = LogFactory.getLog(SocketChannelReadHandle.class);
            
                   private AsynchronousSocketChannel socketChannel;
            
                   /**
                    * 专门用于进行这个通道数据缓存操作的ByteBuffer<br>
                    * 当然,您也可以作为CompletionHandler的attachment形式传入。<br>
                    * 这是,在这段示例代码中,attachment被我们用来记录所有传送过来的Stringbuffer了。
                    */
                   private ByteBuffer byteBuffer;
            
                   public SocketChannelReadHandle(AsynchronousSocketChannel socketChannel , ByteBuffer byteBuffer) {
                       this.socketChannel = socketChannel;
                       this.byteBuffer = byteBuffer;
                   }
            
                   /* (non-Javadoc)
                    * @see java.nio.channels.CompletionHandler#completed(java.lang.Object, java.lang.Object)
                    */
                   @Override
                   public void completed(Integer result, StringBuffer historyContext) {
                       //如果条件成立,说明客户端主动终止了TCP套接字,这时服务端终止就可以了
                       if(result == -1) {
                           try {
                               this.socketChannel.close();
                           } catch (IOException e) {
                               SocketChannelReadHandle.LOGGER.error(e);
                           }
                           return;
                       }
            
                       SocketChannelReadHandle.LOGGER.info("completed(Integer result, Void attachment) : 然后我们来取出通道中准备好的值");
                       /*
                        * 实际上,由于我们从Integer result知道了本次channel从操作系统获取数据总长度
                        * 所以实际上,我们不需要切换成“读模式”的,但是为了保证编码的规范性,还是建议进行切换。
                        *
                        * 另外,无论是JAVA AIO框架还是JAVA NIO框架,都会出现“buffer的总容量”小于“当前从操作系统获取到的总数据量”,
                        * 但区别是,JAVA AIO框架中,我们不需要专门考虑处理这样的情况,因为JAVA AIO框架已经帮我们做了处理(做成了多次通知)
                        * */
                       this.byteBuffer.flip();
                       byte[] contexts = new byte[1024];
                       this.byteBuffer.get(contexts, 0, result);
                       this.byteBuffer.clear();
                       try {
                           String nowContent = new String(contexts , 0 , result , "UTF-8");
                           historyContext.append(nowContent);
                           SocketChannelReadHandle.LOGGER.info("================目前的传输结果:" + historyContext);
                       } catch (UnsupportedEncodingException e) {
                           SocketChannelReadHandle.LOGGER.error(e);
                       }
            
                       //如果条件成立,说明还没有接收到“结束标记”
                       if(historyContext.indexOf("over") == -1) {
                           return;
                       }else{
                           //清空已经读取的缓存,并从新切换为写状态(这里要注意clear()和capacity()两个方法的区别)
                           this.byteBuffer.clear();
                           SocketChannelReadHandle.LOGGER.info("客户端发来的信息======message : " + historyContext);
            
                           //======================================================
                           //          当然接受完成后,可以在这里正式处理业务了        
                           //======================================================
            
                           //回发数据,并关闭channel
                           ByteBuffer sendBuffer = null;
                           try {
                               sendBuffer = ByteBuffer.wrap(URLEncoder.encode("你好客户端,这是服务器的返回数据", "UTF-8").getBytes());
                               socketChannel.write(sendBuffer);
                               socketChannel.close();
                           } catch (Exception e) {
                               e.printStackTrace();
                           }
                       }
            
                       //=========================================================================
                       //          和上篇文章的代码相同,我们以“over”符号作为客户端完整信息的标记
                       //=========================================================================
                       SocketChannelReadHandle.LOGGER.info("=======收到完整信息,开始处理业务=========");
                       historyContext = new StringBuffer();
            
                       //还要继续监听(一次监听一次通知)
                       this.socketChannel.read(this.byteBuffer, historyContext, this);
                   }
            
                   /* (non-Javadoc)
                    * @see java.nio.channels.CompletionHandler#failed(java.lang.Throwable, java.lang.Object)
                    */
                   @Override
                   public void failed(Throwable exc, StringBuffer historyContext) {
                       SocketChannelReadHandle.LOGGER.info("=====发现客户端异常关闭,服务器将关闭TCP通道");
                       try {
                           this.socketChannel.close();
                       } catch (IOException e) {
                           SocketChannelReadHandle.LOGGER.error(e);
                       }
                   }
               }

要点讲解

  • 在JAVA NIO框架中,我们说到了一个重要概念“selector”(选择器)。它负责代替应用查询中所有已注册的通道到操作系统中进行IO事件轮询、管理当前注册的通道集合,定位发生事件的通道等操操作;但是在JAVA AIO框架中,由于应用程序不是“轮询”方式,而是订阅-通知方式,所以不再需要“selector”(选择器)了,改由channel通道直接到操作系统注册监听。
  • JAVA AIO框架中,只实现了两种网络IO通道“AsynchronousServerSocketChannel”(服务器监听通道)、“AsynchronousSocketChannel”(socket套接字通道)。但是无论哪种通道他们都有独立的fileDescriptor(文件标识符)、attachment(附件,附件可以使任意对象,类似“通道上下文”),并被独立的SocketChannelReadHandle类实例引用。我们通过debug操作来看看它们的引用结构:

在测试过程中,我们启动了两个客户端,然后我们观察服务器端对这两个客户端通道的处理情况:

202202131612563464.png

可以看到,在服务器端分别为客户端1和客户端2创建的两个WindowsAsynchronousSocketChannelImpl对象为:

202202131612570245.png

客户端1:WindowsAsynchronousSocketChannelImpl:760 | FileDescriptor:762

客户端2:WindowsAsynchronousSocketChannelImpl:792 | FileDescriptor:797

接下来,我们让两个客户端发送信息到服务器端,并观察服务器端的处理情况。客户端1发来的消息和客户端2发来的消息,在服务器端的处理情况如下图所示:

202202131612576696.png

客户端1:WindowsAsynchronousSocketChannelImpl:760 | FileDescriptor:762 | SocketChannelReadHandle:803 | HeapByteBuffer:808

客户端2:WindowsAsynchronousSocketChannelImpl:792 | FileDescriptor:797 | SocketChannelReadHandle:828 | HeapByteBuffer:833

可以明显看到,服务器端处理每一个客户端通道所使用的SocketChannelReadHandle(处理器)对象都是独立的,并且所引用的SocketChannel对象都是独立的。

  • JAVA NIO和JAVA AIO框架,除了因为操作系统的实现不一样而去掉了Selector外,其他的重要概念都是存在的,例如上文中提到的Channel的概念,还有演示代码中使用的Buffer缓存方式。实际上JAVA NIO和JAVA AIO框架您可以看成是一套完整的“高并发IO处理”的实现。
阅读全文
  • 点赞