Java NIO 之 Selector
这篇博文将来记录下自己Java NIO中另一个重要的东西—-Selector。
关于Selector确实不太好理解,自己也看了很多博客,到目前为止,自己对Selector还是没有弄清楚,感觉模模糊糊的。最主要的可能是没有找到对应的应用场景,不知道在上面地方能够用到Selector。
关于Selector的相关介绍,并发编程网的这篇博文介绍的比较好:http://ifeve.com/selectors/。
本篇博文就不对Selector进行介绍了,本着学习的态度,自己也来尝试通过写Demo来理解下Selector,于是就有了这篇博文。
Selector例子
本应用设想的场景是这样的:
在服务器端:
1、首先将ServerSocketChannel注册在Selector中,并为此通道注册SelectionKey.OP_ACCEPT事件。
2、当有客户端连接进来之后,将SocketChannel注册到Selector中,并为此通道注册SelectionKey.OP_READ事件,便于在服务器端接收到客户端发送过来的数据。
在客户端:
1、首先将SocketChannel注册在Selector中,并为此通道注册SelectionKey.OP_CONNECT事件。
2、当客户端和服务器端连接成功之后,将其注册到Selector中,并为此通道注册SelectionKey.OP_READ事件,便于在客户端接收到服务器点发送过来的数据。
最后,实现客户端和服务器端之间的交互。
这个Demo具体的交互工程如下
客户端和服务器端建立连接后,客户端第一次发送 1,第二个发送2,第N次发送N;服务器端每接收一个消息就给客户端发送一个“收到消息”。
仿照这篇博文中的代码:http://blog.csdn.net/anders_zhuo/article/details/8535719
写了下代码,发现并没有实现如上的功能,有一定的问题,问题还没有找出来,明天继续来找。
实现代码如下:
package selector;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class NIOServer {
//通道选择器
private Selector selector;
public NIOServer(){
try {
selector = Selector.open();
} catch (IOException e) {
e.printStackTrace();
}
}
/*
* 函数功能:服务器端开始监听,看是否有客户端连接进来
* */
private void listen() throws IOException {
System.out.println("server running....");
while(true){
// 当注册事件到达时,方法返回,否则该方法会一直阻塞
selector.select();
// 获得selector中选中的相的迭代器,选中的相为注册的事件
Set<SelectionKey> set = selector.selectedKeys();
Iterator<SelectionKey> ite = set.iterator();
while(ite.hasNext()){
SelectionKey selectionKey = (SelectionKey) ite.next();
// 删除已选的key 以防重负处理
ite.remove();
if(selectionKey.isAcceptable()){//如果有客户端连接进来
//先拿到这个SelectionKey里面的ServerSocketChannel。
ServerSocketChannel serverSocketChannel = (ServerSocketChannel)selectionKey.channel();
// 获得和客户端连接的通道
SocketChannel socketChannel = serverSocketChannel.accept();
System.out.println("有客户端连接到服务器!!!");
socketChannel.configureBlocking(false);//将此通道设置为非阻塞
//服务器端向客户端发送数据
// ByteBuffer buf = ByteBuffer.allocate(40);
// buf.put("hello client!!!".getBytes());
// socketChannel.write(buf);
socketChannel.write(ByteBuffer.wrap(new String("hello client!").getBytes()));
//为了接收客户端发送过来的数据,需要将此通道绑定到选择器上,并为该通道注册读事件
socketChannel.register(selector, SelectionKey.OP_READ);
}
else if(selectionKey.isReadable()){//客户端发送数据过来了
//先拿到这个SelectionKey里面的SocketChannel。
SocketChannel socketChannel = (SocketChannel)selectionKey.channel();
//接收来自于客户端发送过来的数据
ByteBuffer buf = ByteBuffer.allocate(20);
int len = 0;
System.out.println("服务器端接收到的数据为:");
while((len=socketChannel.read(buf))!=-1){
// while(buf.hasRemaining()){
// System.out.println(buf.get()+" ");
// }
byte[] receData = buf.array();
String msg = new String(receData).trim();
System.out.println("接收来自客户端的数据为:"+msg);
buf.clear();
}
//服务器端向客户端发送"确定信息"
buf.put("收到信息".getBytes());
while(buf.hasRemaining()){
socketChannel.write(buf);
}
//buf.clear();
}
}
}
}
/*
* 函数功能:初始化serverSocketChannel来监听指定的端口是否有新的TCP连接,
* 并将serverSocketChannel注册到selector中
* */
private void init(int port) {
try {
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
//serverSocketChannel监听指定端口
serverSocketChannel.socket().bind(new InetSocketAddress(port));
serverSocketChannel.configureBlocking(false);//设置为非阻塞模式
/*
* 将serverSocketChannel注册到selector中,并为该通道注册selectionKey.OP_ACCEPT事件
* 注册该事件后,当事件到达的时候,selector.select()会返回, 如果事件没有到达selector.select()会一直阻塞
* */
serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws IOException {
NIOServer server = new NIOServer();
server.init(9999);
server.listen();
}
}
客户端
package selector;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;
public class NIOClient {
private Selector selector;
public NIOClient() throws IOException {
this.selector =Selector.open();
}
private void init(String address,int port) throws IOException{
//客户端,首先有一个SocketChannel
SocketChannel socketChannel = SocketChannel.open();
socketChannel.configureBlocking(false);//将此通道设置为非阻塞模式
//连接
socketChannel.connect(new InetSocketAddress(address,port));
//将SocketChannel注册到selector中,并为该通道注册SelectionKey.OP_CONNECT
socketChannel.register(selector, SelectionKey.OP_CONNECT);
}
public static void main(String[] args) throws IOException {
NIOClient client = new NIOClient();
client.init("localhost",9999);
client.connect();
}
private void connect() throws IOException {
int data = 1;
while(true){
selector.select();//
Set<SelectionKey> set = selector.selectedKeys();
Iterator<SelectionKey> ite = set.iterator();
while(ite.hasNext()){
SelectionKey selectionKey = (SelectionKey) ite.next();
ite.remove(); //删除已选的key,以防重复处理
if(selectionKey.isConnectable()){//看是否有连接发生
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
//如果正在连接,则完成连接
if(socketChannel.isConnectionPending()){
socketChannel.finishConnect();
}
socketChannel.configureBlocking(false);//设置为非阻塞模式
//给服务器端发送数据
System.out.println("客户端连接上了服务器端。。。。");
// ByteBuffer buf = ByteBuffer.allocate(128);
// buf.put("hello server.....".getBytes());
// socketChannel.write(buf);
socketChannel.write(ByteBuffer.wrap(new String("hello server!").getBytes()));
//为了接收来自服务器端的数据,将此通道注册到选择器中
socketChannel.register(selector, SelectionKey.OP_READ);
}
else if(selectionKey.isReadable()){
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
//接收来自于服务器端发送过来的数据
ByteBuffer buf = ByteBuffer.allocate(10);
int len = 0;
System.out.println("客户端接收到的数据为:");
while((len=socketChannel.read(buf))!=-1){
// while(buf.hasRemaining()){
// System.out.println(buf.get()+" ");
// }
byte[] receData = buf.array();
String msg = new String(receData).trim();
System.out.println("接收来自服务器端的数据为:"+msg);
buf.clear();
}
//发送数据
buf.put((data+"").getBytes());
while(buf.hasRemaining()){
socketChannel.write(buf);
}
data++;
}
}
}
}
}
Selector例子(修正)
在上面的例子中,就如昨天所说,还存在一点问题,客户端和服务器端还不能够正常的交互。2016年10月11日20:30:54。
通过debug发现产生问题的代码如下:
int len = 0;
while((len=socketChannel.read(buf))!=-1){
byte[] receData = buf.array();
String msg = new String(receData).trim();
System.out.println("接收来自客户端的数据为:"+msg);
buf.clear();
}
发现在上面的while循环中是一个死循环,是不能够退出的。也就是说,socketChannel管道中的数据并不是读一次这个数据就没有了,而是还存于管道中,即使服务器端只发送一次数据给客户端,客户端通过如上的while循环的代码来读取数据是不会结束的。
将上面的代码更改为如下的代码即可:即采用更大的空间只读一次
//接收来自于客户端发送过来的数据
ByteBuffer buf = ByteBuffer.allocate(128);
socketChannel.read(buf);
byte[] receData = buf.array();
String msg = new String(receData).trim();
System.out.println("接收来自客户端的数据为:"+msg);
buf.clear();
修正这个问题之后基本就客户端和服务器端基本就能够交互了
但是由于我这个Demo想完成的任务为:客户端和服务器端建立连接后,客户端第一次发送 1,第二个发送2,第N次发送N;服务器端每接收一个消息就给客户端发送一个“收到消息”。
但是发现服务器端收到的消息并不是1,2,3.。。。
查找原因发现客户端发送数据不能使用如下的代码:
//发送数据
buf.put((data+"").getBytes());//data初始值为1
while(buf.hasRemaining()){
socketChannel.write(buf);
}
data++;
换成如下的代码就可以正常工作了
socketChannel.write(ByteBuffer.wrap(new String(data+"").getBytes()));
data++;
至于原因,目前还不知道为什么也。
能够正常工作的完整代码可以在这里提取:https://github.com/wojiushimogui/Selector
小结
对Selector不怎么熟悉,模仿别人的代码来实现客户端和服务器端之间的交互,还存在一些问题。