上一章,我对分布式文件系统的存储架构进行了整体讲解。我们回顾一下,dfs-client客户端首先需要通过RPC调用获取DataNode节点信息,然后再基于Java NIO完成文件的上传和下载。本章,我就来实现Java NIO这块的功能。
一、DFS客户端接口
首先,我们来考虑下如何基于Java NIO实现DFS客户端的文件上传和下载功能。DFS客户端需要提供两个接口,并整合上一章讲到的RPC调用:
/**
* dfs客户端接口
*/
public interface FileSystem {
/**
* 上传一个文件
*
* @param file file byte array
* @param filename 文件名,以如如文件分隔符开头,则移除
* @param fileSize file size
*/
Boolean upload(byte[] file, String filename, long fileSize) throws Exception;
/**
* 下载一个文件
*
* @param filename 文件名
* @return 文件的字节数组
* @throws Exception
*/
byte[] download(String filename) throws Exception;
}
1.1 上传流程
文件上传流程如下:
- 客户端将待上传的文件元数据通过RPC接口发送给NameNode;
- 客户端收到成功响应后,调用RPC接口获取可用的DataNode节点;
- 客户端遍历DataNode,依次基于Java NIO完成文件上传。
/**
* dfs客户端接口的实现类
*/
public class FileSystemImpl implements FileSystem {
private NameNodeRpcClient rpcClient = new NameNodeRpcClient();
public Boolean upload(byte[] file, String filename, long fileSize) throws Exception {
// 1.RPC接口发送文件元数据
if (!filename.startsWith(File.separator)) {
filename = File.separator + filename;
}
if (!rpcClient.createFile(filename)) {
return false;
}
// 2.RPC接口获取DataNode
String datanodesJson = rpcClient.allocateDataNodes(filename, fileSize);
System.out.println(datanodesJson);
if (datanodesJson == null) {
return false;
}
// 3.遍历DataNode,依次上传文件
JSONArray datanodes = JSONArray.parseArray(datanodesJson);
for (int i = 0; i < datanodes.size(); i++) {
JSONObject datanode = datanodes.getJSONObject(i);
String hostname = datanode.getString("hostname");
int nioPort = datanode.getIntValue("nioPort");
// NIO客户端
DFSNIOClient.sendFile(hostname, nioPort, file, filename, fileSize);
}
return true;
}
}
1.2 下载流程
文件下载流程如下:
- 客户端通过调用RPC接口,从NameNode获取待下载文件对应的可用DataNode节点;
- 客户端解析出DataNode节点信息;
- 客户端基于Java NIO完成文件下载。
// FileSystemImpl.java
public byte[] download(String filename) throws Exception {
// 1.获取待下载文件对应的可用DataNode节点
String datanode = rpcClient.getDataNodeForFile(filename);
System.out.println("NameNode分配用来下载文件的数据节点:" + datanode);
// 2.解析DataNode信息
JSONObject jsonObject = JSONObject.parseObject(datanode);
String hostname = jsonObject.getString("hostname");
Integer nioPort = jsonObject.getInteger("nioPort");
// 3.基于Java NIO下载文件
return DFSNIOClient.readFile(hostname, nioPort, filename);
}
二、文件上传
接下来,我们就要正式基于Java NIO来实现文件上传和下载功能了,先从文件上传开始。关于Java NIO,在使用上其实并没有什么难点,无非就是对于NIO包的使用。原生NIO的使用,最核心也是最难的一点在于对于 拆包 和 粘包 问题的处理。
2.1 客户端
首先,我们需要在dsf-client
客户端工程中新增一个NIO客户端类——DFSNIOClient
,并提供文件上传的sendfile
接口。由于NIO底层基于TCP流进行传输,所以我们需要自定义报文格式:
requestType | filenameLength | filename | fileSize | file
也就是说,一个完整的文件数据包应包含以下内容:
- 4字节请求类型标识:1-文件上传 2-文件下载;
- 4字节文件名大小标识;
- 文件名内容;
- 8字节文件内容大小标识;
- 文件内容。
NIO客户端发送文件流的代码并不复杂,遵循Java NIO编程的模板,大部分都是一些Boilerplate Code:
- 创建SocketChannel,指定要连接的IP和端口;
- 在Channel上注册Selector,并关注
OP_CONNECT
事件; - 调用
Selector.select()
阻塞等待,然后遍历各个SelectionKey,按照不同事件类型进行处理。
我们需要重点关注的是发送请求时对拆包的处理,主要采取的处理手段有两点:
- 自定义报文格式;
- 缓存ByteBuffer,判断一个完整数据包是否发送完毕。
/**
* NIO Client,负责跟DataNode进行网络通信
*/
public class DFSNIOClient {
// 文件上传
public static final Integer SEND_FILE = 1;
// 文件下载
public static final Integer READ_FILE = 2;
/**
* 上传文件
*/
public static void sendFile(String hostname, int nioPort, byte[] file, String filename, long fileSize) {
Selector selector = null;
SocketChannel channel = null;
ByteBuffer buffer = null;
try {
// 1.与建立短连接
selector = Selector.open();
channel = SocketChannel.open();
// 非阻塞模式,调用完connect立即返回
channel.configureBlocking(false);
channel.connect(new InetSocketAddress(hostname, nioPort));
// 关注SocketChannel的OP_CONNECT事件
channel.register(selector, SelectionKey.OP_CONNECT);
boolean sending = true;
while (sending) {
// 这里是同步调用,线程会同步等待直到SocketChannel有事件发生,所以NIO是一种“同步非阻塞模式”
selector.select();
// 2.遍历SelectionKey
Iterator<SelectionKey> keysIterator = selector.selectedKeys().iterator();
while (keysIterator.hasNext()) {
SelectionKey key = (SelectionKey) keysIterator.next();
keysIterator.remove();
// 3.1 连接建立
if (key.isConnectable()) {
channel = (SocketChannel) key.channel();
// 该channel正在建立TCP连接
if (channel.isConnectionPending()) {
// 三次握手,直到TCP连接建立完成
while (!channel.finishConnect()) {
Thread.sleep(100);
}
}
System.out.println("完成与服务端的连接的建立......");
}
// 3.2 发送数据到NIO Server
else if (key.isWritable()) {
if (buffer == null) {
// 封装文件请求数据(固定格式:TYPE+文件名大小+文件名+文件大小+文件内容)
// requestType | filenameLength | filename | fileSize | file
buffer = ByteBuffer.allocate(4 + 4 + filename.getBytes().length + 8 + (int) fileSize);
System.out.println("准备发送的数据包大小为:" + buffer.capacity());
// 4字节请求类型标识:1-文件上传 2-文件下载
buffer.putInt(SEND_FILE);
// 4字节文件名大小标识
buffer.putInt(filename.getBytes().length);
// 文件名内容
buffer.put(filename.getBytes());
// 8字节文件内容大小标识
buffer.putLong(fileSize);
// 文件内容
buffer.put(file);
buffer.rewind();
int sent = channel.write(buffer);
System.out.println("已经发送了" + sent + " bytes的数据到服务端去");
// 这里是对“拆包”问题进行处理”
if (buffer.hasRemaining()) {
System.out.println("本次数据包没有发送完毕,下次会继续发送.......");
key.interestOps(SelectionKey.OP_WRITE);
} else {
System.out.println("本次数据包发送完毕,准备读取服务端的响应......");
buffer = null;
key.interestOps(SelectionKey.OP_READ);
}
} else {
channel = (SocketChannel) key.channel();
int sent = channel.write(buffer);
System.out.println("上一次数据包没有发送完毕,本次继续发送了" + sent + " bytes");
if (!buffer.hasRemaining()) {
System.out.println("本次数据包没有发送完毕,下次会继续发送.......");
key.interestOps(SelectionKey.OP_READ);
}
}
}
// 3.3 接收NIOServer响应
else if (key.isReadable()) {
channel = (SocketChannel) key.channel();
buffer = ByteBuffer.allocate(1024);
int len = channel.read(buffer);
buffer.flip();
if (len > 0) {
System.out.println("[" + Thread.currentThread().getName()
+ "]收到" + hostname + "的响应:" + new String(buffer.array(), 0, len));
sending = false;
}
}
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
//...
}
}
}
Java NIO,本质是一种”同步非阻塞“模式。Selector在OS底层会采用非阻塞方式监听多个SocketChannel,而对于调用Selector.select()
的线程而言,是一种同步调用,调用线程会同步等待直到SocketChannel有事件发生。
注意:上述代码,我没有再对客户端处理Server响应时的”拆包“问题进行处理,因为客户端响应的内容较少,而且后面将文件下载时我会专门针对该问题的处理进行讲解。
三、文件下载
文件下载也是基于NIO进行网络通信。对于客户端和服务端,需要重点注意对”拆包“问题的处理。
3.1 客户端
NIO客户端在进行文件下载时,处理流程如下:
- 首先,发送一个请求给服务端,告知要下载的文件,我们还是一样自定义报文格式。
- 接着,读取Server的响应,解析字节流。
这里在处理NIO Server的响应时,文件长度、文件内容都可能出现”拆包“问题,所以我用了两个缓存对象——fileLengthBuffer
和fileBuffer
,当这两个Buffer没有剩余空间时,代表读完了完整数据。这也是处理拆包问题的一种常见思路:
// DFSNIOClient.java
/**
* 下载文件
*/
public static byte[] readFile(String hostname, Integer nioPort, String filename) {
ByteBuffer fileLengthBuffer = null;
Long fileLength = null;
ByteBuffer fileBuffer = null;
byte[] file = null;
SocketChannel channel = null;
Selector selector = null;
try {
channel = SocketChannel.open();
channel.configureBlocking(false);
channel.connect(new InetSocketAddress(hostname, nioPort));
selector = Selector.open();
channel.register(selector, SelectionKey.OP_CONNECT);
boolean reading = true;
while (reading) {
selector.select();
Iterator<SelectionKey> keysIterator = selector.selectedKeys().iterator();
while (keysIterator.hasNext()) {
SelectionKey key = (SelectionKey) keysIterator.next();
keysIterator.remove();
// 1.建立连接
if (key.isConnectable()) {
channel = (SocketChannel) key.channel();
if (channel.isConnectionPending()) {
// 三次握手,直到TCP连接建立完成
while (!channel.finishConnect()) {
Thread.sleep(100);
}
}
System.out.println("完成与服务端的连接的建立......");
// 1.1 立即发送一个请求给服务端,告知要下载的文件
// requestType | filenameLength | filename | fileSize
byte[] filenameBytes = filename.getBytes();
ByteBuffer readFileRequest = ByteBuffer.allocate(4 + 4 + filenameBytes.length);
readFileRequest.putInt(READ_FILE);
readFileRequest.putInt(filenameBytes.length);
readFileRequest.put(filenameBytes);
readFileRequest.flip();
channel.write(readFileRequest);
System.out.println("发送文件下载的请求过去......");
key.interestOps(SelectionKey.OP_READ);
}
// 2.读取响应内容
else if (key.isReadable()) {
channel = (SocketChannel) key.channel();
// 2.1 处理文件大小的拆包
if (fileLength == null) {
if (fileLengthBuffer == null) {
fileLengthBuffer = ByteBuffer.allocate(8);
}
channel.read(fileLengthBuffer);
if (!fileLengthBuffer.hasRemaining()) {
fileLengthBuffer.rewind();
fileLength = fileLengthBuffer.getLong();
System.out.println("从服务端返回数据中解析文件大小:" + fileLength);
}
}
// 2.2 处理文件内容的拆包
if (fileLength != null) {
if (fileBuffer == null) {
fileBuffer = ByteBuffer.allocate(Integer.valueOf(String.valueOf(fileLength)));
}
int hasRead = channel.read(fileBuffer);
System.out.println("从服务端读取了" + hasRead + " bytes的数据出来到内存中");
if (!fileBuffer.hasRemaining()) {
fileBuffer.rewind();
file = fileBuffer.array();
System.out.println("最终获取到的文件的大小为" + file.length + " bytes");
reading = false;
}
}
}
}
}
return file;
} catch (Exception e) {
e.printStackTrace();
} finally {
//...
}
return null;
}
二、NIO服务端
文件上传和下载的复杂之处在于DataNode侧的NIO Server的编码,需要考虑TCP的粘包和拆包的问题。我在实现时仅针对拆包问题进行了一定处理(也就说,Server接受文件时,可能分多次接收到了同一个文件的数据流),粘包问题没有考虑,默认认为一个连接就传输一个文件。
DataNode负责进行文件上传处理的组件是DataNodeNIOServer
,它会根据客户端地址,将同一个客户端的请求路由到唯一的内存队列中,然后为每个内存队列分配一个工作线程进行处理:
2.1 DataNodeNIOServer
DataNodeNIOServer的主体实现如下,DataNodeNIOServer主线程会监听各类NIO事件,然后通过handleRequest
方法进行预处理,最后将成功建立连接的SocketChannel分发到内存队列中,由Woker线程进行处理。
需要注意的是,我这边 用了很多Map来缓存那些因为拆包问题而没发送完的文件内容 ,比如说一个100kb的文件,拆分成了两次来调用handleRequest
,第一次是过来80kb,第二次是过来20kb。
因为我们自定义的报文格式是:requestType | filenameLength | filename | fileSize | file
,所以无论是文件名、文件大小、还是文件内容,都有可能发生拆包。
/**
* DataNode NIO通信组件
*/
public class DataNodeNIOServer extends Thread {
private NameNodeRpcClient rpcClient;
public static final Integer SEND_FILE = 1;
public static final Integer READ_FILE = 2;
private Selector selector;
private List<LinkedBlockingQueue<SelectionKey>> queues = new ArrayList<>();
// 缓存的上一次未处理完请求,Key为客户端IP
private Map<String, InflightRequest> cachedRequestMap = new ConcurrentHashMap<>();
// 缓存没读取完的请求类型
private Map<String, ByteBuffer> requestTypeByClient = new ConcurrentHashMap<String, ByteBuffer>();
// 缓存没读取完的文件名大小
private Map<String, ByteBuffer> filenameLengthByClient = new ConcurrentHashMap<String, ByteBuffer>();
// 缓存没读取完的文件名
private Map<String, ByteBuffer> filenameByClient = new ConcurrentHashMap<String, ByteBuffer>();
// 缓存没读取完的文件大小
private Map<String, ByteBuffer> fileLengthByClient = new ConcurrentHashMap<String, ByteBuffer>();
// 缓存没读取完的文件
private Map<String, ByteBuffer> fileByClient = new ConcurrentHashMap<String, ByteBuffer>();
public DataNodeNIOServer(NameNodeRpcClient rpcClient) {
this.rpcClient = rpcClient;
init();
}
@Override
public void run() {
while (true) {
try {
// 阻塞等待
selector.select();
Iterator<SelectionKey> keysIterator = selector.selectedKeys().iterator();
while (keysIterator.hasNext()) {
SelectionKey key = (SelectionKey) keysIterator.next();
keysIterator.remove();
handleEvent(key);
}
} catch (Throwable t) {
t.printStackTrace();
}
}
}
/*-------------------------------------------------PRIVATE METHOD----------------------------------------------*/
private void init() {
ServerSocketChannel serverChannel = null;
try {
selector = Selector.open();
serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
serverChannel.socket().bind(new InetSocketAddress(NIO_PORT), 100);
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
// 创建3个缓冲队列
for (int i = 0; i < 3; i++) {
queues.add(new LinkedBlockingQueue<SelectionKey>());
}
// 创建三个工作线程,每个线程分配一个队列
for (int i = 0; i < 3; i++) {
new Worker(queues.get(i)).start();
}
System.out.println("NIOServer已经启动,开始监听端口:" + NIO_PORT);
} catch (IOException e) {
e.printStackTrace();
}
}
private void handleEvent(SelectionKey key) throws Exception {
SocketChannel channel = null;
try {
// 1.建立连接
if (key.isAcceptable()) {
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
channel = serverSocketChannel.accept();
if (channel != null) {
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_READ);
}
}
// 2.读取请求
else if (key.isReadable()) {
channel = (SocketChannel) key.channel();
// 根据客户端IP地址进行hash,即同一个客户端的请求均入同一个队列
String remoteAddr = channel.getRemoteAddress().toString();
int queueIndex = remoteAddr.hashCode() % queues.size();
queues.get(queueIndex).put(key);
}
} catch (Throwable t) {
t.printStackTrace();
if (channel != null) {
channel.close();
}
}
}
//...
}
上述代码需要特别注意cachedRequestMap
,它里面保存了上一次未处理完的请求InflightRequest
,可能是上传文件请求,也可能是下载文件请求。
/**
* 缓存的请求数据
*/
class InflightRequest {
// 文件名,以前缀分隔符开始,比如/dir/enclosure/qq.jpg
private String filename;
// 文件总大小
private Long filesize;
// 已读取的大小
private Long hasReadedSize;
// 请求类型
private Integer requestType;
public InflightRequest() {
}
//省略get/set...
}
2.2 Worker线程
我们来看Worker线程如何对接收到SocketChannel进行处理。本质就是基于Java NIO的SDK对我们约定好的报文格式进行处理:
- 首先,从自己对应的队列中出队一个元素;
- 然后,解析请求头中的类型,根据不同的类型分别处理。
下面的代码涉及了大量的针对拆包问题的处理代码,我不赘述了,读者可以看代码自己体悟理解,核心思路就是用一个Map缓存未处理完的ByteBuffer:
/**
* 工作线程
*/
class Worker extends Thread {
private LinkedBlockingQueue<SelectionKey> queue;
public Worker(LinkedBlockingQueue<SelectionKey> queue) {
this.queue = queue;
}
@Override
public void run() {
while (true) {
SocketChannel channel = null;
try {
// 出队一个元素
SelectionKey key = queue.take();
channel = (SocketChannel) key.channel();
if (!channel.isOpen()) {
channel.close();
continue;
}
handleRequest(channel, key);
} catch (Exception e) {
e.printStackTrace();
if (channel != null) {
try {
channel.close();
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
}
}
private void handleRequest(SocketChannel channel, SelectionKey key) throws Exception {
String client = channel.getRemoteAddress().toString();
System.out.println("接收到客户端的请求:" + client);
// 1.针对请求的拆包问题处理
if (cachedRequestMap.containsKey(client)) {
System.out.println("上一次上传文件请求出现拆包问题,本次继续执行文件上传操作......");
handleSendFileRequest(channel, key);
return;
}
// 2.处理请求类型
Integer requestType = getRequestType(channel);
if (requestType == null) {
return;
}
System.out.println("从请求中解析出来请求类型:" + requestType);
// 上传文件请求
if (SEND_FILE.equals(requestType)) {
handleSendFileRequest(channel, key);
}
// 下载文件请求
else if (READ_FILE.equals(requestType)) {
handleReadFileRequest(channel, key);
}
}
private void handleReadFileRequest(SocketChannel channel, SelectionKey key) throws IOException {
String client = channel.getRemoteAddress().toString();
// 从请求中解析文件名
String filename = getFilename(channel);
System.out.println("从网络请求中解析出来文件名:" + filename);
if (filename == null) {
return;
}
File file = new File(DATA_DIR + filename);
Long fileLength = file.length();
FileInputStream imageIn = new FileInputStream(DATA_DIR + filename);
FileChannel imageChannel = imageIn.getChannel();
// 循环不断的从channel里读取数据,并写入磁盘文件
ByteBuffer buffer = ByteBuffer.allocate(
8 + Integer.valueOf(String.valueOf(fileLength)));
buffer.putLong(fileLength);
int hasReadImageLength = imageChannel.read(buffer);
System.out.println("从本次磁盘文件中读取了" + hasReadImageLength + " bytes的数据");
buffer.rewind();
int sent = channel.write(buffer);
System.out.println("将" + sent + " bytes的数据发送给了客户端.....");
imageChannel.close();
imageIn.close();
// 判断一下,如果已经读取完毕,就返回一个成功给客户端
if (hasReadImageLength == fileLength) {
System.out.println("文件发送完毕,给客户端: " + client);
cachedRequestMap.remove(client);
key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
}
}
private Integer getRequestType(SocketChannel channel) throws IOException {
Integer requestType = null;
String client = channel.getRemoteAddress().toString();
if (getCachedRequest(client).getRequestType() != null) {
return getCachedRequest(client).getRequestType();
}
// 对请求类型的拆包问题进行处理
ByteBuffer requestTypeBuffer = null;
if (requestTypeByClient.containsKey(client)) {
requestTypeBuffer = requestTypeByClient.get(client);
} else {
requestTypeBuffer = ByteBuffer.allocate(4);
}
channel.read(requestTypeBuffer);
if (!requestTypeBuffer.hasRemaining()) {
requestTypeBuffer.rewind();
requestType = requestTypeBuffer.getInt();
requestTypeByClient.remove(client);
InflightRequest cachedRequest = getCachedRequest(client);
cachedRequest.setRequestType(requestType);
} else {
requestTypeByClient.put(client, requestTypeBuffer);
}
return requestType;
}
private Long getHasReadFileLength(SocketChannel channel) throws Exception {
String client = channel.getRemoteAddress().toString();
if (getCachedRequest(client).getHasReadedSize() != null) {
return getCachedRequest(client).getHasReadedSize();
}
return 0L;
}
private void handleSendFileRequest(SocketChannel channel, SelectionKey key) throws Exception {
String client = channel.getRemoteAddress().toString();
// 1.从请求中解析文件名
String filename = getFilename(channel);
System.out.println("从网络请求中解析出来文件名:" + filename);
if (filename == null) {
return;
}
// 2.从请求中解析文件大小
Long fileLength = getFileLength(channel);
System.out.println("从网络请求中解析出来文件大小:" + fileLength);
if (fileLength == null) {
return;
}
// 获取已经读取的文件大小
long hasReadImageLength = getHasReadFileLength(channel);
System.out.println("初始化已经读取的文件大小:" + hasReadImageLength);
// 构建针对本地文件的输出流
FileOutputStream imageOut = null;
FileChannel imageChannel = null;
try {
imageOut = new FileOutputStream(DATA_DIR + "" + filename);
imageChannel = imageOut.getChannel();
imageChannel.position(imageChannel.size());
System.out.println("对本地磁盘文件定位到position=" + imageChannel.size());
// 循环不断的从channel里读取数据,并写入磁盘文件
ByteBuffer fileBuffer = null;
if (fileByClient.containsKey(client)) {
fileBuffer = fileByClient.get(client);
} else {
fileBuffer = ByteBuffer.allocate(Integer.valueOf(String.valueOf(fileLength)));
}
hasReadImageLength += channel.read(fileBuffer);
if (!fileBuffer.hasRemaining()) {
fileBuffer.rewind();
int written = imageChannel.write(fileBuffer);
fileByClient.remove(client);
System.out.println("本次文件上传完毕,将" + written + " bytes的数据写入本地磁盘文件.......");
ByteBuffer outBuffer = ByteBuffer.wrap("SUCCESS".getBytes());
channel.write(outBuffer);
cachedRequestMap.remove(client);
System.out.println("文件读取完毕,返回响应给客户端: " + client);
// 增量上报
rpcClient.deltaReportDataNodeInfo(filename, hasReadImageLength);
System.out.println("增量上报收到的文件副本给NameNode节点......");
key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
} else {
fileByClient.put(client, fileBuffer);
getCachedRequest(client).setHasReadedSize(hasReadImageLength);
System.out.println("本次文件上传出现拆包问题,缓存起来,下次继续读取.......");
return;
}
} finally {
imageChannel.close();
imageOut.close();
}
}
private Long getFileLength(SocketChannel channel) throws IOException {
Long fileLength = null;
String client = channel.getRemoteAddress().toString();
if (getCachedRequest(client).getFilesize() != null) {
return getCachedRequest(client).getFilesize();
} else {
ByteBuffer fileLengthBuffer = null;
if (fileLengthByClient.get(client) != null) {
fileLengthBuffer = fileLengthByClient.get(client);
} else {
fileLengthBuffer = ByteBuffer.allocate(8);
}
channel.read(fileLengthBuffer);
if (!fileLengthBuffer.hasRemaining()) {
fileLengthBuffer.rewind();
fileLength = fileLengthBuffer.getLong();
fileLengthByClient.remove(client);
getCachedRequest(client).setFilesize(fileLength);
} else {
fileLengthByClient.put(client, fileLengthBuffer);
}
}
return fileLength;
}
private String getFilename(SocketChannel channel) throws IOException {
String client = channel.getRemoteAddress().toString();
if (getCachedRequest(client).getFilename() != null) {
return getCachedRequest(client).getFilename();
} else {
Integer filenameLength = null;
String filename = null;
// 读取文件名的大小
if (!filenameByClient.containsKey(client)) {
ByteBuffer filenameLengthBuffer = null;
if (filenameLengthByClient.containsKey(client)) {
filenameLengthBuffer = filenameLengthByClient.get(client);
} else {
filenameLengthBuffer = ByteBuffer.allocate(4);
}
channel.read(filenameLengthBuffer);
if (!filenameLengthBuffer.hasRemaining()) {
filenameLengthBuffer.rewind();
filenameLength = filenameLengthBuffer.getInt();
filenameLengthByClient.remove(client);
} else {
filenameLengthByClient.put(client, filenameLengthBuffer);
return null;
}
}
// 读取文件名
ByteBuffer filenameBuffer = null;
if (filenameByClient.containsKey(client)) {
filenameBuffer = filenameByClient.get(client);
} else {
filenameBuffer = ByteBuffer.allocate(filenameLength);
}
channel.read(filenameBuffer);
if (!filenameBuffer.hasRemaining()) {
filenameBuffer.rewind();
filename = new String(filenameBuffer.array());
filenameByClient.remove(client);
} else {
filenameByClient.put(client, filenameBuffer);
}
return filename;
}
}
}
另外需要注意的是,上述代码,解析完文件内容并持久化以后,调用NameNodeRpcClient.deltaReportDataNodeInfo()
方法进行一次DataNode增量信息上报,关于信息上报的实现,我在下一章讲解。
三、总结
本章,我对分布式文件系统的文件上传/下载的功能基于Java NIO进行了实现。Java NIO本身的使用并不困难,难点在于对 粘包 和 拆包 问题的处理。本章,我对粘包处理的实现并不完善,后续专栏,我可能会对Netty进行讲解,Netty对于 粘包 和 拆包 问题具有一整套完善的解决机制。