2023-01-14
原文作者: HelloWorld_EE 原文地址:https://blog.csdn.net/u010412719/category_6159934_2.html

Java NIO 之 Selector

这篇博文将来记录下自己Java NIO中另一个重要的东西—-Selector。

关于Selector确实不太好理解,自己也看了很多博客,到目前为止,自己对Selector还是没有弄清楚,感觉模模糊糊的。最主要的可能是没有找到对应的应用场景,不知道在上面地方能够用到Selector。

关于Selector的相关介绍,并发编程网的这篇博文介绍的比较好:http://ifeve.com/selectors/
本篇博文就不对Selector进行介绍了,本着学习的态度,自己也来尝试通过写Demo来理解下Selector,于是就有了这篇博文。

Selector例子

本应用设想的场景是这样的:

在服务器端:

1、首先将ServerSocketChannel注册在Selector中,并为此通道注册SelectionKey.OP_ACCEPT事件。

2、当有客户端连接进来之后,将SocketChannel注册到Selector中,并为此通道注册SelectionKey.OP_READ事件,便于在服务器端接收到客户端发送过来的数据。

在客户端:

1、首先将SocketChannel注册在Selector中,并为此通道注册SelectionKey.OP_CONNECT事件。

2、当客户端和服务器端连接成功之后,将其注册到Selector中,并为此通道注册SelectionKey.OP_READ事件,便于在客户端接收到服务器点发送过来的数据。

最后,实现客户端和服务器端之间的交互。

这个Demo具体的交互工程如下
客户端和服务器端建立连接后,客户端第一次发送 1,第二个发送2,第N次发送N;服务器端每接收一个消息就给客户端发送一个“收到消息”。

仿照这篇博文中的代码:http://blog.csdn.net/anders_zhuo/article/details/8535719

写了下代码,发现并没有实现如上的功能,有一定的问题,问题还没有找出来,明天继续来找。

实现代码如下:

        package selector;
    
        import java.io.IOException;
        import java.net.InetSocketAddress;
        import java.nio.ByteBuffer;
        import java.nio.channels.SelectionKey;
        import java.nio.channels.Selector;
        import java.nio.channels.ServerSocketChannel;
        import java.nio.channels.SocketChannel;
        import java.util.Iterator;
        import java.util.Set;
    
        public class NIOServer {
    
            //通道选择器
            private Selector selector;
            public NIOServer(){
                try {
                    selector = Selector.open();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
    
            /*
             * 函数功能:服务器端开始监听,看是否有客户端连接进来
             * */
            private void listen() throws IOException {
                System.out.println("server running....");
                while(true){
                    // 当注册事件到达时,方法返回,否则该方法会一直阻塞 
                    selector.select();
                    // 获得selector中选中的相的迭代器,选中的相为注册的事件 
                    Set<SelectionKey> set = selector.selectedKeys();
                    Iterator<SelectionKey> ite = set.iterator();
                    while(ite.hasNext()){
                        SelectionKey selectionKey = (SelectionKey) ite.next();
                        // 删除已选的key 以防重负处理  
                        ite.remove(); 
                        if(selectionKey.isAcceptable()){//如果有客户端连接进来
                            //先拿到这个SelectionKey里面的ServerSocketChannel。
                            ServerSocketChannel serverSocketChannel = (ServerSocketChannel)selectionKey.channel();
                            // 获得和客户端连接的通道
                            SocketChannel socketChannel = serverSocketChannel.accept();
                            System.out.println("有客户端连接到服务器!!!");
                            socketChannel.configureBlocking(false);//将此通道设置为非阻塞
                            //服务器端向客户端发送数据
        //                  ByteBuffer buf = ByteBuffer.allocate(40);
        //                  buf.put("hello client!!!".getBytes());
        //                  socketChannel.write(buf);
                            socketChannel.write(ByteBuffer.wrap(new String("hello client!").getBytes()));
                            //为了接收客户端发送过来的数据,需要将此通道绑定到选择器上,并为该通道注册读事件  
                            socketChannel.register(selector, SelectionKey.OP_READ);
                        }
                        else if(selectionKey.isReadable()){//客户端发送数据过来了
                            //先拿到这个SelectionKey里面的SocketChannel。
                            SocketChannel socketChannel = (SocketChannel)selectionKey.channel();           
    
                            //接收来自于客户端发送过来的数据
                            ByteBuffer buf = ByteBuffer.allocate(20);
                            int len = 0;
                            System.out.println("服务器端接收到的数据为:");
                            while((len=socketChannel.read(buf))!=-1){
    
        //                      while(buf.hasRemaining()){
        //                          System.out.println(buf.get()+" ");
        //                      }
                                byte[] receData = buf.array();
                                String msg = new String(receData).trim();
                                System.out.println("接收来自客户端的数据为:"+msg);
                                buf.clear();
                            }
    
                            //服务器端向客户端发送"确定信息"
    
                            buf.put("收到信息".getBytes());
                            while(buf.hasRemaining()){
                                socketChannel.write(buf);
                            }   
                            //buf.clear();
                        }
                    }
    
                }
    
            }
            /*
             * 函数功能:初始化serverSocketChannel来监听指定的端口是否有新的TCP连接,
             * 并将serverSocketChannel注册到selector中
             * */
            private void init(int port) {
                try {
                    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
                    //serverSocketChannel监听指定端口
                    serverSocketChannel.socket().bind(new InetSocketAddress(port));
                    serverSocketChannel.configureBlocking(false);//设置为非阻塞模式
    
                    /*
                     * 将serverSocketChannel注册到selector中,并为该通道注册selectionKey.OP_ACCEPT事件  
                     * 注册该事件后,当事件到达的时候,selector.select()会返回,  如果事件没有到达selector.select()会一直阻塞
                     * */
                    serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
    
            public static void main(String[] args) throws IOException {
                NIOServer server = new NIOServer();
                server.init(9999);
                server.listen();
            }
    
        }

客户端

        package selector;
    
        import java.io.IOException;
        import java.net.InetSocketAddress;
        import java.nio.ByteBuffer;
        import java.nio.channels.SelectionKey;
        import java.nio.channels.Selector;
        import java.nio.channels.SocketChannel;
        import java.util.Iterator;
        import java.util.Set;
    
        public class NIOClient {
    
            private Selector selector;
    
            public NIOClient() throws IOException {
                this.selector =Selector.open();
            }
    
            private void init(String address,int port) throws IOException{
                //客户端,首先有一个SocketChannel
                SocketChannel socketChannel = SocketChannel.open();
                socketChannel.configureBlocking(false);//将此通道设置为非阻塞模式
                //连接
                socketChannel.connect(new InetSocketAddress(address,port));
    
                //将SocketChannel注册到selector中,并为该通道注册SelectionKey.OP_CONNECT
                socketChannel.register(selector, SelectionKey.OP_CONNECT);
    
    
            }
    
            public static void main(String[] args) throws IOException {
                NIOClient client = new NIOClient();
                client.init("localhost",9999);
                client.connect();
    
            }
    
            private void connect() throws IOException {
                int data = 1;
                while(true){
                    selector.select();//
                    Set<SelectionKey> set = selector.selectedKeys();
                    Iterator<SelectionKey> ite = set.iterator();
    
                    while(ite.hasNext()){
                        SelectionKey selectionKey = (SelectionKey) ite.next();
                        ite.remove(); //删除已选的key,以防重复处理
                        if(selectionKey.isConnectable()){//看是否有连接发生
                            SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                            //如果正在连接,则完成连接
                            if(socketChannel.isConnectionPending()){
                                socketChannel.finishConnect();
                            }
                            socketChannel.configureBlocking(false);//设置为非阻塞模式
                            //给服务器端发送数据
                            System.out.println("客户端连接上了服务器端。。。。");
        //                  ByteBuffer buf = ByteBuffer.allocate(128);
        //                  buf.put("hello server.....".getBytes());
        //                  socketChannel.write(buf);
                            socketChannel.write(ByteBuffer.wrap(new String("hello server!").getBytes()));
                            //为了接收来自服务器端的数据,将此通道注册到选择器中
                            socketChannel.register(selector, SelectionKey.OP_READ);
                        }
                        else if(selectionKey.isReadable()){
                            SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
                            //接收来自于服务器端发送过来的数据
                            ByteBuffer buf = ByteBuffer.allocate(10);
                            int len = 0;
                            System.out.println("客户端接收到的数据为:");
                            while((len=socketChannel.read(buf))!=-1){
        //                      while(buf.hasRemaining()){
        //                          System.out.println(buf.get()+" ");
        //                      }
                                byte[] receData = buf.array();
                                String msg = new String(receData).trim();
                                System.out.println("接收来自服务器端的数据为:"+msg);
                                buf.clear();
                            }
                            //发送数据
                            buf.put((data+"").getBytes());
                            while(buf.hasRemaining()){
                                socketChannel.write(buf);
                            }
    
                            data++;
                        }           
    
    
                    }
    
                }
    
            }
    
        }

Selector例子(修正)

在上面的例子中,就如昨天所说,还存在一点问题,客户端和服务器端还不能够正常的交互。2016年10月11日20:30:54。

通过debug发现产生问题的代码如下:

                        int len = 0;
                        while((len=socketChannel.read(buf))!=-1){
    
                            byte[] receData = buf.array();
                            String msg = new String(receData).trim();
                            System.out.println("接收来自客户端的数据为:"+msg);
                            buf.clear();
                        }

发现在上面的while循环中是一个死循环,是不能够退出的。也就是说,socketChannel管道中的数据并不是读一次这个数据就没有了,而是还存于管道中,即使服务器端只发送一次数据给客户端,客户端通过如上的while循环的代码来读取数据是不会结束的。

将上面的代码更改为如下的代码即可:即采用更大的空间只读一次

                        //接收来自于客户端发送过来的数据
                        ByteBuffer buf = ByteBuffer.allocate(128);
                        socketChannel.read(buf);
                        byte[] receData = buf.array();
                        String msg = new String(receData).trim();
                        System.out.println("接收来自客户端的数据为:"+msg);
                        buf.clear();

修正这个问题之后基本就客户端和服务器端基本就能够交互了

但是由于我这个Demo想完成的任务为:客户端和服务器端建立连接后,客户端第一次发送 1,第二个发送2,第N次发送N;服务器端每接收一个消息就给客户端发送一个“收到消息”。

但是发现服务器端收到的消息并不是1,2,3.。。。

查找原因发现客户端发送数据不能使用如下的代码:

                        //发送数据
                        buf.put((data+"").getBytes());//data初始值为1
                        while(buf.hasRemaining()){
                            socketChannel.write(buf);
                        }
                        data++;

换成如下的代码就可以正常工作了

                socketChannel.write(ByteBuffer.wrap(new String(data+"").getBytes()));
                data++;

至于原因,目前还不知道为什么也。

能够正常工作的完整代码可以在这里提取:https://github.com/wojiushimogui/Selector

小结

对Selector不怎么熟悉,模仿别人的代码来实现客户端和服务器端之间的交互,还存在一些问题。

阅读全文