2023-07-30
原文作者:Ressmix 原文地址:https://www.tpvlog.com/article/320

上一章,我对分布式文件系统的存储架构进行了整体讲解。我们回顾一下,dfs-client客户端首先需要通过RPC调用获取DataNode节点信息,然后再基于Java NIO完成文件的上传和下载。本章,我就来实现Java NIO这块的功能。

一、DFS客户端接口

首先,我们来考虑下如何基于Java NIO实现DFS客户端的文件上传和下载功能。DFS客户端需要提供两个接口,并整合上一章讲到的RPC调用:

    /**
     * dfs客户端接口
     */
    public interface FileSystem {
    
        /**
         * 上传一个文件
         *
         * @param file     file byte array
         * @param filename 文件名,以如如文件分隔符开头,则移除
         * @param fileSize file size
         */
        Boolean upload(byte[] file, String filename, long fileSize) throws Exception;
    
    
        /**
         * 下载一个文件
         *
         * @param filename 文件名
         * @return 文件的字节数组
         * @throws Exception
         */
        byte[] download(String filename) throws Exception;
    }

1.1 上传流程

文件上传流程如下:

  1. 客户端将待上传的文件元数据通过RPC接口发送给NameNode;
  2. 客户端收到成功响应后,调用RPC接口获取可用的DataNode节点;
  3. 客户端遍历DataNode,依次基于Java NIO完成文件上传。
    /**
     * dfs客户端接口的实现类
     */
    public class FileSystemImpl implements FileSystem {
    
        private NameNodeRpcClient rpcClient = new NameNodeRpcClient();
    
        public Boolean upload(byte[] file, String filename, long fileSize) throws Exception {
            // 1.RPC接口发送文件元数据
            if (!filename.startsWith(File.separator)) {
                filename = File.separator + filename;
            }
            if (!rpcClient.createFile(filename)) {
                return false;
            }
    
            // 2.RPC接口获取DataNode
            String datanodesJson = rpcClient.allocateDataNodes(filename, fileSize);
            System.out.println(datanodesJson);
            if (datanodesJson == null) {
                return false;
            }
    
            // 3.遍历DataNode,依次上传文件
            JSONArray datanodes = JSONArray.parseArray(datanodesJson);
            for (int i = 0; i < datanodes.size(); i++) {
                JSONObject datanode = datanodes.getJSONObject(i);
                String hostname = datanode.getString("hostname");
                int nioPort = datanode.getIntValue("nioPort");
                // NIO客户端
                DFSNIOClient.sendFile(hostname, nioPort, file, filename, fileSize);
            }
            return true;
        }
    
    }

1.2 下载流程

文件下载流程如下:

  1. 客户端通过调用RPC接口,从NameNode获取待下载文件对应的可用DataNode节点;
  2. 客户端解析出DataNode节点信息;
  3. 客户端基于Java NIO完成文件下载。
    // FileSystemImpl.java
    
    public byte[] download(String filename) throws Exception {
        // 1.获取待下载文件对应的可用DataNode节点
        String datanode = rpcClient.getDataNodeForFile(filename);
        System.out.println("NameNode分配用来下载文件的数据节点:" + datanode);
    
        // 2.解析DataNode信息
        JSONObject jsonObject = JSONObject.parseObject(datanode);
        String hostname = jsonObject.getString("hostname");
        Integer nioPort = jsonObject.getInteger("nioPort");
    
        // 3.基于Java NIO下载文件
        return DFSNIOClient.readFile(hostname, nioPort, filename);
    }

二、文件上传

接下来,我们就要正式基于Java NIO来实现文件上传和下载功能了,先从文件上传开始。关于Java NIO,在使用上其实并没有什么难点,无非就是对于NIO包的使用。原生NIO的使用,最核心也是最难的一点在于对于 拆包粘包 问题的处理。

2.1 客户端

首先,我们需要在dsf-client客户端工程中新增一个NIO客户端类——DFSNIOClient,并提供文件上传的sendfile接口。由于NIO底层基于TCP流进行传输,所以我们需要自定义报文格式:

    requestType | filenameLength | filename | fileSize | file

也就是说,一个完整的文件数据包应包含以下内容:

  • 4字节请求类型标识:1-文件上传 2-文件下载;
  • 4字节文件名大小标识;
  • 文件名内容;
  • 8字节文件内容大小标识;
  • 文件内容。

NIO客户端发送文件流的代码并不复杂,遵循Java NIO编程的模板,大部分都是一些Boilerplate Code

  1. 创建SocketChannel,指定要连接的IP和端口;
  2. 在Channel上注册Selector,并关注OP_CONNECT事件;
  3. 调用Selector.select()阻塞等待,然后遍历各个SelectionKey,按照不同事件类型进行处理。

我们需要重点关注的是发送请求时对拆包的处理,主要采取的处理手段有两点:

  1. 自定义报文格式;
  2. 缓存ByteBuffer,判断一个完整数据包是否发送完毕。
    /**
     * NIO Client,负责跟DataNode进行网络通信
     */
    public class DFSNIOClient {
    
        // 文件上传
        public static final Integer SEND_FILE = 1;
        // 文件下载
        public static final Integer READ_FILE = 2;
    
        /**
         * 上传文件
         */
        public static void sendFile(String hostname, int nioPort, byte[] file, String filename, long fileSize) {
    
            Selector selector = null;
            SocketChannel channel = null;
            ByteBuffer buffer = null;
    
            try {
                // 1.与建立短连接
                selector = Selector.open();
                channel = SocketChannel.open();
                // 非阻塞模式,调用完connect立即返回
                channel.configureBlocking(false);
                channel.connect(new InetSocketAddress(hostname, nioPort));
                // 关注SocketChannel的OP_CONNECT事件
                channel.register(selector, SelectionKey.OP_CONNECT);
    
                boolean sending = true;
                while (sending) {
                    // 这里是同步调用,线程会同步等待直到SocketChannel有事件发生,所以NIO是一种“同步非阻塞模式”
                    selector.select();
    
                    // 2.遍历SelectionKey
                    Iterator<SelectionKey> keysIterator = selector.selectedKeys().iterator();
                    while (keysIterator.hasNext()) {
                        SelectionKey key = (SelectionKey) keysIterator.next();
                        keysIterator.remove();
    
                        // 3.1 连接建立
                        if (key.isConnectable()) {
                            channel = (SocketChannel) key.channel();
                            // 该channel正在建立TCP连接
                            if (channel.isConnectionPending()) {
                                // 三次握手,直到TCP连接建立完成
                                while (!channel.finishConnect()) {
                                    Thread.sleep(100);
                                }
                            }
                            System.out.println("完成与服务端的连接的建立......");
                        }
                        // 3.2 发送数据到NIO Server
                        else if (key.isWritable()) {
                            if (buffer == null) {
                                // 封装文件请求数据(固定格式:TYPE+文件名大小+文件名+文件大小+文件内容)
                                // requestType | filenameLength | filename | fileSize | file
                                buffer = ByteBuffer.allocate(4 + 4 + filename.getBytes().length + 8 + (int) fileSize);
                                System.out.println("准备发送的数据包大小为:" + buffer.capacity());
    
                                // 4字节请求类型标识:1-文件上传 2-文件下载
                                buffer.putInt(SEND_FILE);
                                // 4字节文件名大小标识
                                buffer.putInt(filename.getBytes().length);
                                // 文件名内容
                                buffer.put(filename.getBytes());
                                // 8字节文件内容大小标识
                                buffer.putLong(fileSize);
                                // 文件内容
                                buffer.put(file);
                                buffer.rewind();
    
                                int sent = channel.write(buffer);
                                System.out.println("已经发送了" + sent + " bytes的数据到服务端去");
    
                                // 这里是对“拆包”问题进行处理”
                                if (buffer.hasRemaining()) {
                                    System.out.println("本次数据包没有发送完毕,下次会继续发送.......");
                                    key.interestOps(SelectionKey.OP_WRITE);
                                } else {
                                    System.out.println("本次数据包发送完毕,准备读取服务端的响应......");
                                    buffer = null;
                                    key.interestOps(SelectionKey.OP_READ);
                                }
                            } else {
                                channel = (SocketChannel) key.channel();
                                int sent = channel.write(buffer);
                                System.out.println("上一次数据包没有发送完毕,本次继续发送了" + sent + " bytes");
                                if (!buffer.hasRemaining()) {
                                    System.out.println("本次数据包没有发送完毕,下次会继续发送.......");
                                    key.interestOps(SelectionKey.OP_READ);
                                }
                            }
                        }
                        // 3.3 接收NIOServer响应
                        else if (key.isReadable()) {
                            channel = (SocketChannel) key.channel();
                            buffer = ByteBuffer.allocate(1024);
                            int len = channel.read(buffer);
                            buffer.flip();
                            if (len > 0) {
                                System.out.println("[" + Thread.currentThread().getName()
                                        + "]收到" + hostname + "的响应:" + new String(buffer.array(), 0, len));
                                sending = false;
                            }
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //...
            }
        }
    }

Java NIO,本质是一种”同步非阻塞“模式。Selector在OS底层会采用非阻塞方式监听多个SocketChannel,而对于调用Selector.select()的线程而言,是一种同步调用,调用线程会同步等待直到SocketChannel有事件发生。

注意:上述代码,我没有再对客户端处理Server响应时的”拆包“问题进行处理,因为客户端响应的内容较少,而且后面将文件下载时我会专门针对该问题的处理进行讲解。

三、文件下载

文件下载也是基于NIO进行网络通信。对于客户端和服务端,需要重点注意对”拆包“问题的处理。

3.1 客户端

NIO客户端在进行文件下载时,处理流程如下:

  1. 首先,发送一个请求给服务端,告知要下载的文件,我们还是一样自定义报文格式。
  2. 接着,读取Server的响应,解析字节流。

这里在处理NIO Server的响应时,文件长度、文件内容都可能出现”拆包“问题,所以我用了两个缓存对象——fileLengthBufferfileBuffer,当这两个Buffer没有剩余空间时,代表读完了完整数据。这也是处理拆包问题的一种常见思路:

    // DFSNIOClient.java
    
    /**
     * 下载文件
     */
    public static byte[] readFile(String hostname, Integer nioPort, String filename) {
        ByteBuffer fileLengthBuffer = null;
        Long fileLength = null;
    
        ByteBuffer fileBuffer = null;
        byte[] file = null;
    
        SocketChannel channel = null;
        Selector selector = null;
        try {
            channel = SocketChannel.open();
            channel.configureBlocking(false);
            channel.connect(new InetSocketAddress(hostname, nioPort));
            selector = Selector.open();
            channel.register(selector, SelectionKey.OP_CONNECT);
    
            boolean reading = true;
            while (reading) {
                selector.select();
                Iterator<SelectionKey> keysIterator = selector.selectedKeys().iterator();
                while (keysIterator.hasNext()) {
                    SelectionKey key = (SelectionKey) keysIterator.next();
                    keysIterator.remove();
    
                    // 1.建立连接
                    if (key.isConnectable()) {
                        channel = (SocketChannel) key.channel();
                        if (channel.isConnectionPending()) {
                            // 三次握手,直到TCP连接建立完成
                            while (!channel.finishConnect()) {
                                Thread.sleep(100);
                            }
                        }
                        System.out.println("完成与服务端的连接的建立......");
    
                        // 1.1 立即发送一个请求给服务端,告知要下载的文件
                        // requestType | filenameLength | filename | fileSize
                        byte[] filenameBytes = filename.getBytes();
                        ByteBuffer readFileRequest = ByteBuffer.allocate(4 + 4 + filenameBytes.length);
                        readFileRequest.putInt(READ_FILE);
                        readFileRequest.putInt(filenameBytes.length);
                        readFileRequest.put(filenameBytes);
                        readFileRequest.flip();
    
                        channel.write(readFileRequest);
                        System.out.println("发送文件下载的请求过去......");
                        key.interestOps(SelectionKey.OP_READ);
                    }
                    // 2.读取响应内容
                    else if (key.isReadable()) {
                        channel = (SocketChannel) key.channel();
    
                        // 2.1 处理文件大小的拆包
                        if (fileLength == null) {
                            if (fileLengthBuffer == null) {
                                fileLengthBuffer = ByteBuffer.allocate(8);
                            }
                            channel.read(fileLengthBuffer);
                            if (!fileLengthBuffer.hasRemaining()) {
                                fileLengthBuffer.rewind();
                                fileLength = fileLengthBuffer.getLong();
                                System.out.println("从服务端返回数据中解析文件大小:" + fileLength);
                            }
                        }
                        // 2.2 处理文件内容的拆包
                        if (fileLength != null) {
                            if (fileBuffer == null) {
                                fileBuffer = ByteBuffer.allocate(Integer.valueOf(String.valueOf(fileLength)));
                            }
                            int hasRead = channel.read(fileBuffer);
                            System.out.println("从服务端读取了" + hasRead + " bytes的数据出来到内存中");
                            if (!fileBuffer.hasRemaining()) {
                                fileBuffer.rewind();
                                file = fileBuffer.array();
                                System.out.println("最终获取到的文件的大小为" + file.length + " bytes");
                                reading = false;
                            }
                        }
                    }
                }
            }
            return file;
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //...
        }
        return null;
    }

二、NIO服务端

文件上传和下载的复杂之处在于DataNode侧的NIO Server的编码,需要考虑TCP的粘包和拆包的问题。我在实现时仅针对拆包问题进行了一定处理(也就说,Server接受文件时,可能分多次接收到了同一个文件的数据流),粘包问题没有考虑,默认认为一个连接就传输一个文件。

DataNode负责进行文件上传处理的组件是DataNodeNIOServer,它会根据客户端地址,将同一个客户端的请求路由到唯一的内存队列中,然后为每个内存队列分配一个工作线程进行处理:

202307302137211981.png

2.1 DataNodeNIOServer

DataNodeNIOServer的主体实现如下,DataNodeNIOServer主线程会监听各类NIO事件,然后通过handleRequest方法进行预处理,最后将成功建立连接的SocketChannel分发到内存队列中,由Woker线程进行处理。

需要注意的是,我这边 用了很多Map来缓存那些因为拆包问题而没发送完的文件内容 ,比如说一个100kb的文件,拆分成了两次来调用handleRequest,第一次是过来80kb,第二次是过来20kb。

因为我们自定义的报文格式是:requestType | filenameLength | filename | fileSize | file,所以无论是文件名、文件大小、还是文件内容,都有可能发生拆包。

    /**
     * DataNode NIO通信组件
     */
    public class DataNodeNIOServer extends Thread {
    
        private NameNodeRpcClient rpcClient;
    
        public static final Integer SEND_FILE = 1;
        public static final Integer READ_FILE = 2;
    
        private Selector selector;
    
        private List<LinkedBlockingQueue<SelectionKey>> queues = new ArrayList<>();
    
        // 缓存的上一次未处理完请求,Key为客户端IP
        private Map<String, InflightRequest> cachedRequestMap = new ConcurrentHashMap<>();
        // 缓存没读取完的请求类型
        private Map<String, ByteBuffer> requestTypeByClient = new ConcurrentHashMap<String, ByteBuffer>();
        // 缓存没读取完的文件名大小
        private Map<String, ByteBuffer> filenameLengthByClient = new ConcurrentHashMap<String, ByteBuffer>();
        // 缓存没读取完的文件名
        private Map<String, ByteBuffer> filenameByClient = new ConcurrentHashMap<String, ByteBuffer>();
        // 缓存没读取完的文件大小
        private Map<String, ByteBuffer> fileLengthByClient = new ConcurrentHashMap<String, ByteBuffer>();
        // 缓存没读取完的文件
        private Map<String, ByteBuffer> fileByClient = new ConcurrentHashMap<String, ByteBuffer>();
    
        public DataNodeNIOServer(NameNodeRpcClient rpcClient) {
            this.rpcClient = rpcClient;
            init();
        }
    
        @Override
        public void run() {
            while (true) {
                try {
                    // 阻塞等待
                    selector.select();
                    Iterator<SelectionKey> keysIterator = selector.selectedKeys().iterator();
                    while (keysIterator.hasNext()) {
                        SelectionKey key = (SelectionKey) keysIterator.next();
                        keysIterator.remove();
                        handleEvent(key);
                    }
                } catch (Throwable t) {
                    t.printStackTrace();
                }
            }
        }
    
        /*-------------------------------------------------PRIVATE METHOD----------------------------------------------*/
    
        private void init() {
            ServerSocketChannel serverChannel = null;
            try {
                selector = Selector.open();
                serverChannel = ServerSocketChannel.open();
                serverChannel.configureBlocking(false);
                serverChannel.socket().bind(new InetSocketAddress(NIO_PORT), 100);
                serverChannel.register(selector, SelectionKey.OP_ACCEPT);
    
                // 创建3个缓冲队列
                for (int i = 0; i < 3; i++) {
                    queues.add(new LinkedBlockingQueue<SelectionKey>());
                }
    
                // 创建三个工作线程,每个线程分配一个队列
                for (int i = 0; i < 3; i++) {
                    new Worker(queues.get(i)).start();
                }
    
                System.out.println("NIOServer已经启动,开始监听端口:" + NIO_PORT);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        private void handleEvent(SelectionKey key) throws Exception {
            SocketChannel channel = null;
    
            try {
                // 1.建立连接
                if (key.isAcceptable()) {
                    ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
                    channel = serverSocketChannel.accept();
                    if (channel != null) {
                        channel.configureBlocking(false);
                        channel.register(selector, SelectionKey.OP_READ);
                    }
                }
                // 2.读取请求
                else if (key.isReadable()) {
                    channel = (SocketChannel) key.channel();
                    // 根据客户端IP地址进行hash,即同一个客户端的请求均入同一个队列
                    String remoteAddr = channel.getRemoteAddress().toString();
                    int queueIndex = remoteAddr.hashCode() % queues.size();
                    queues.get(queueIndex).put(key);
                }
            } catch (Throwable t) {
                t.printStackTrace();
                if (channel != null) {
                    channel.close();
                }
            }
        }
        //...
    }

上述代码需要特别注意cachedRequestMap,它里面保存了上一次未处理完的请求InflightRequest,可能是上传文件请求,也可能是下载文件请求。

    /**
     * 缓存的请求数据
     */
    class InflightRequest {
        // 文件名,以前缀分隔符开始,比如/dir/enclosure/qq.jpg
        private String filename;
        // 文件总大小
        private Long filesize;
        // 已读取的大小
        private Long hasReadedSize;
        // 请求类型
        private Integer requestType;
    
        public InflightRequest() {
        }
    
        //省略get/set...
    }

2.2 Worker线程

我们来看Worker线程如何对接收到SocketChannel进行处理。本质就是基于Java NIO的SDK对我们约定好的报文格式进行处理:

  1. 首先,从自己对应的队列中出队一个元素;
  2. 然后,解析请求头中的类型,根据不同的类型分别处理。

下面的代码涉及了大量的针对拆包问题的处理代码,我不赘述了,读者可以看代码自己体悟理解,核心思路就是用一个Map缓存未处理完的ByteBuffer:

    /**
     * 工作线程
     */
    class Worker extends Thread {
        private LinkedBlockingQueue<SelectionKey> queue;
    
        public Worker(LinkedBlockingQueue<SelectionKey> queue) {
            this.queue = queue;
        }
    
        @Override
        public void run() {
            while (true) {
                SocketChannel channel = null;
                try {
                    // 出队一个元素
                    SelectionKey key = queue.take();
                    channel = (SocketChannel) key.channel();
                    if (!channel.isOpen()) {
                        channel.close();
                        continue;
                    }
                    handleRequest(channel, key);
                } catch (Exception e) {
                    e.printStackTrace();
                    if (channel != null) {
                        try {
                            channel.close();
                        } catch (IOException e1) {
                            e1.printStackTrace();
                        }
                    }
                }
            }
        }
    
        private void handleRequest(SocketChannel channel, SelectionKey key) throws Exception {
            String client = channel.getRemoteAddress().toString();
            System.out.println("接收到客户端的请求:" + client);
    
            // 1.针对请求的拆包问题处理
            if (cachedRequestMap.containsKey(client)) {
                System.out.println("上一次上传文件请求出现拆包问题,本次继续执行文件上传操作......");
                handleSendFileRequest(channel, key);
                return;
            }
    
            // 2.处理请求类型
            Integer requestType = getRequestType(channel);
            if (requestType == null) {
                return;
            }
            System.out.println("从请求中解析出来请求类型:" + requestType);
            // 上传文件请求
            if (SEND_FILE.equals(requestType)) {
                handleSendFileRequest(channel, key);
            }
            // 下载文件请求
            else if (READ_FILE.equals(requestType)) {
                handleReadFileRequest(channel, key);
            }
    
        }
    
        private void handleReadFileRequest(SocketChannel channel, SelectionKey key) throws IOException {
            String client = channel.getRemoteAddress().toString();
    
            // 从请求中解析文件名
            String filename = getFilename(channel);
            System.out.println("从网络请求中解析出来文件名:" + filename);
            if (filename == null) {
                return;
            }
    
            File file = new File(DATA_DIR + filename);
            Long fileLength = file.length();
    
            FileInputStream imageIn = new FileInputStream(DATA_DIR + filename);
            FileChannel imageChannel = imageIn.getChannel();
    
            // 循环不断的从channel里读取数据,并写入磁盘文件
            ByteBuffer buffer = ByteBuffer.allocate(
                    8 + Integer.valueOf(String.valueOf(fileLength)));
            buffer.putLong(fileLength);
            int hasReadImageLength = imageChannel.read(buffer);
            System.out.println("从本次磁盘文件中读取了" + hasReadImageLength + " bytes的数据");
    
            buffer.rewind();
            int sent = channel.write(buffer);
            System.out.println("将" + sent + " bytes的数据发送给了客户端.....");
    
            imageChannel.close();
            imageIn.close();
    
            // 判断一下,如果已经读取完毕,就返回一个成功给客户端
            if (hasReadImageLength == fileLength) {
                System.out.println("文件发送完毕,给客户端: " + client);
                cachedRequestMap.remove(client);
                key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
            }
        }
    
        private Integer getRequestType(SocketChannel channel) throws IOException {
            Integer requestType = null;
            String client = channel.getRemoteAddress().toString();
    
            if (getCachedRequest(client).getRequestType() != null) {
                return getCachedRequest(client).getRequestType();
            }
            // 对请求类型的拆包问题进行处理
            ByteBuffer requestTypeBuffer = null;
            if (requestTypeByClient.containsKey(client)) {
                requestTypeBuffer = requestTypeByClient.get(client);
            } else {
                requestTypeBuffer = ByteBuffer.allocate(4);
            }
            channel.read(requestTypeBuffer);
    
            if (!requestTypeBuffer.hasRemaining()) {
                requestTypeBuffer.rewind();
                requestType = requestTypeBuffer.getInt();
    
                requestTypeByClient.remove(client);
                InflightRequest cachedRequest = getCachedRequest(client);
                cachedRequest.setRequestType(requestType);
            } else {
                requestTypeByClient.put(client, requestTypeBuffer);
            }
            return requestType;
        }
    
        private Long getHasReadFileLength(SocketChannel channel) throws Exception {
            String client = channel.getRemoteAddress().toString();
            if (getCachedRequest(client).getHasReadedSize() != null) {
                return getCachedRequest(client).getHasReadedSize();
            }
            return 0L;
        }
    
        private void handleSendFileRequest(SocketChannel channel, SelectionKey key) throws Exception {
            String client = channel.getRemoteAddress().toString();
    
            // 1.从请求中解析文件名
            String filename = getFilename(channel);
            System.out.println("从网络请求中解析出来文件名:" + filename);
            if (filename == null) {
                return;
            }
            // 2.从请求中解析文件大小
            Long fileLength = getFileLength(channel);
            System.out.println("从网络请求中解析出来文件大小:" + fileLength);
            if (fileLength == null) {
                return;
            }
            // 获取已经读取的文件大小
            long hasReadImageLength = getHasReadFileLength(channel);
            System.out.println("初始化已经读取的文件大小:" + hasReadImageLength);
    
            // 构建针对本地文件的输出流
            FileOutputStream imageOut = null;
            FileChannel imageChannel = null;
    
            try {
                imageOut = new FileOutputStream(DATA_DIR + "" + filename);
                imageChannel = imageOut.getChannel();
                imageChannel.position(imageChannel.size());
                System.out.println("对本地磁盘文件定位到position=" + imageChannel.size());
    
                // 循环不断的从channel里读取数据,并写入磁盘文件
                ByteBuffer fileBuffer = null;
                if (fileByClient.containsKey(client)) {
                    fileBuffer = fileByClient.get(client);
                } else {
                    fileBuffer = ByteBuffer.allocate(Integer.valueOf(String.valueOf(fileLength)));
                }
    
                hasReadImageLength += channel.read(fileBuffer);
                if (!fileBuffer.hasRemaining()) {
                    fileBuffer.rewind();
                    int written = imageChannel.write(fileBuffer);
                    fileByClient.remove(client);
                    System.out.println("本次文件上传完毕,将" + written + " bytes的数据写入本地磁盘文件.......");
    
                    ByteBuffer outBuffer = ByteBuffer.wrap("SUCCESS".getBytes());
                    channel.write(outBuffer);
                    cachedRequestMap.remove(client);
                    System.out.println("文件读取完毕,返回响应给客户端: " + client);
    
                    // 增量上报
                    rpcClient.deltaReportDataNodeInfo(filename, hasReadImageLength);
                    System.out.println("增量上报收到的文件副本给NameNode节点......");
    
                    key.interestOps(key.interestOps() & ~SelectionKey.OP_READ);
                } else {
                    fileByClient.put(client, fileBuffer);
                    getCachedRequest(client).setHasReadedSize(hasReadImageLength);
                    System.out.println("本次文件上传出现拆包问题,缓存起来,下次继续读取.......");
                    return;
                }
            } finally {
                imageChannel.close();
                imageOut.close();
            }
        }
    
        private Long getFileLength(SocketChannel channel) throws IOException {
            Long fileLength = null;
            String client = channel.getRemoteAddress().toString();
    
            if (getCachedRequest(client).getFilesize() != null) {
                return getCachedRequest(client).getFilesize();
            } else {
                ByteBuffer fileLengthBuffer = null;
                if (fileLengthByClient.get(client) != null) {
                    fileLengthBuffer = fileLengthByClient.get(client);
                } else {
                    fileLengthBuffer = ByteBuffer.allocate(8);
                }
    
                channel.read(fileLengthBuffer);
                if (!fileLengthBuffer.hasRemaining()) {
                    fileLengthBuffer.rewind();
                    fileLength = fileLengthBuffer.getLong();
                    fileLengthByClient.remove(client);
                    getCachedRequest(client).setFilesize(fileLength);
                } else {
                    fileLengthByClient.put(client, fileLengthBuffer);
                }
            }
            return fileLength;
        }
    
        private String getFilename(SocketChannel channel) throws IOException {
            String client = channel.getRemoteAddress().toString();
            if (getCachedRequest(client).getFilename() != null) {
                return getCachedRequest(client).getFilename();
            } else {
                Integer filenameLength = null;
                String filename = null;
    
                // 读取文件名的大小
                if (!filenameByClient.containsKey(client)) {
                    ByteBuffer filenameLengthBuffer = null;
                    if (filenameLengthByClient.containsKey(client)) {
                        filenameLengthBuffer = filenameLengthByClient.get(client);
                    } else {
                        filenameLengthBuffer = ByteBuffer.allocate(4);
                    }
                    channel.read(filenameLengthBuffer);
    
                    if (!filenameLengthBuffer.hasRemaining()) {
                        filenameLengthBuffer.rewind();
                        filenameLength = filenameLengthBuffer.getInt();
                        filenameLengthByClient.remove(client);
                    } else {
                        filenameLengthByClient.put(client, filenameLengthBuffer);
                        return null;
                    }
                }
    
                // 读取文件名
                ByteBuffer filenameBuffer = null;
                if (filenameByClient.containsKey(client)) {
                    filenameBuffer = filenameByClient.get(client);
                } else {
                    filenameBuffer = ByteBuffer.allocate(filenameLength);
                }
                channel.read(filenameBuffer);
    
                if (!filenameBuffer.hasRemaining()) {
                    filenameBuffer.rewind();
                    filename = new String(filenameBuffer.array());
                    filenameByClient.remove(client);
                } else {
                    filenameByClient.put(client, filenameBuffer);
                }
                return filename;
            }
        }
    }

另外需要注意的是,上述代码,解析完文件内容并持久化以后,调用NameNodeRpcClient.deltaReportDataNodeInfo()方法进行一次DataNode增量信息上报,关于信息上报的实现,我在下一章讲解。

三、总结

本章,我对分布式文件系统的文件上传/下载的功能基于Java NIO进行了实现。Java NIO本身的使用并不困难,难点在于对 粘包拆包 问题的处理。本章,我对粘包处理的实现并不完善,后续专栏,我可能会对Netty进行讲解,Netty对于 粘包拆包 问题具有一整套完善的解决机制。

阅读全文