2023-07-30
原文作者:Ressmix 原文地址:https://www.tpvlog.com/article/320

分布式文件系统的文件存储的最后一块内容,就是实现DataNode的信息上报。我在前面章节已经讲过,DataNode信息上报分为 增量信息上报全量信息上报 ,增量信息上报在文件上传完成后由DataNodeNIOServer调用,全量信息上报则需要在DataNode首次注册时完成。

一、信息上报

我们首先来看DataNode的启动。DataNode内部封装了很多核心组件,与信息上报最相关的就是LeaseManagerStorageManager

    /**
     * DataNode启动类
     */
    public class DataNode {
    
        private volatile Boolean isRunning;
    
        // 与NameNode进行RPC通信的组件
        private NameNodeRpcClient rpcClient;
    
        // 负责文件存储管理的组件
        private StorageManager storageManager;
    
        // 负责心跳和注册管理的组件
        private LeaseManager leaseManager;
    
        // 负责NIO通信的组件
        private DataNodeNIOServer nioServer;
    
        public static void main(String[] args) {
            DataNode datanode = new DataNode();
            datanode.initialize();
            datanode.start();
        }
    
        /**
         * 初始化DataNode
         */
        private void initialize() {
            this.rpcClient = new NameNodeRpcClient();
            this.storageManager = new StorageManager();
    
            this.nioServer = new DataNodeNIOServer(rpcClient);
            this.leaseManager = new LeaseManager(rpcClient, storageManager);
    
            // 1.启动后立即进行一次注册
            leaseManager.register();
            // 2.开启心跳
            leaseManager.heartbeat();
            // 3.开启NIO Server
            nioServer.start();
        }
    
        /**
         * 运行DataNode
         */
        private void start() {
            this.isRunning = true;
            try {
                while (isRunning) {
                    Thread.sleep(1000);
                }
            } catch (Exception e) {
                this.isRunning = false;
                e.printStackTrace();
            }
        }
    }

1.1 全量上报

DataNode初始化过程中,会立即调用LeaseManager的register方法进行一次服务注册,服务注册成功后就会进行一次全量DataNode信息上报:

    /**
     * Data的服务注册和心跳管理组件
     */
    public class LeaseManager {
    
        // 首次注册成功
        public static final Integer REGISTER_FIRST_SUCCESS = 10000;
    
        // 已经注册过
        public static final Integer REGISTER_EXIST = 10001;
    
        // 首次心跳(还没注册)
        public static final Integer RENEW_FIRST = 10003;
    
        // 已经存在且心跳成功
        public static final Integer RENEW_SUCCESS = 10004;
    
        private NameNodeRpcClient rpcClient;
    
        private StorageManager storageManager;
    
        public LeaseManager(NameNodeRpcClient rpcClient, StorageManager storageManager) {
            this.rpcClient = rpcClient;
            this.storageManager = storageManager;
        }
    
        public Boolean register() {
            RegisterResponse response = rpcClient.register();
            if (response.getStatus() == REGISTER_FIRST_SUCCESS) {
                // 首次注册成功
                System.out.println("首次注册成功,需要全量上报存储信息......");
                DataNodeInfo dataNodeInfo = storageManager.getStorageInfo();
                rpcClient.fullyReportDataNodeInfo(dataNodeInfo);
                return true;
            } else {
                // 节点已经存在不能重复注册
                System.out.println("节点已注册,不需要全量上报存储信息......");
                return false;
            }
        }
    
        public void heartbeat() {
            new HeartbeatThread().start();
        }
    
        /**
         * 负责心跳的线程
         */
        private class HeartbeatThread extends Thread {
            @Override
            public void run() {
                while (true) {
                    try {
                        // 发送心跳
                        HeartbeatResponse response = rpcClient.heartbeat();
                        if (response.getStatus() == RENEW_FIRST) {
                            //心跳失败,节点还没注册
                            register();
                        }
                        Thread.sleep(30 * 1000);
                    } catch (Exception e) {
                        System.out.println("当前NameNode不可用,心跳失败.......");
                    }
                }
            }
        }
    }

我们来思考一下,NameNode接受到DataNode的服务注册请求后,会做哪些判断和处理?

  1. NameNode缓存有该DataNode的信息,也就说该DataNode已经注册过,则不再进行全量上报;
  2. NameNode中不存在该DataNode的信息,可能是DataNode从未注册,也可能是DataNode注册后长时间未发送心跳被剔除了,则需要进行全量上报;

针对这两类请求,NameNode会返回不同的响应码,而DataNode就需要针对首次注册的响应做处理。全量上报所需的DataNode信息由StorageManager组件生成:

    /**
     * 负责文件存储管理的组件
     */
    public class StorageManager {
    
        /**
         * 获取DataNode的存储信息
         */
        public DataNodeInfo getStorageInfo() {
            DataNodeInfo dataNodeInfo = new DataNodeInfo();
            File rootDir = new File(DATA_DIR);
            File[] children = rootDir.listFiles();
            if (children != null || children.length > 0) {
                for (File child : children) {
                    scanFiles(child, dataNodeInfo);
                }
            }
            return dataNodeInfo;
        }
    
        /*---------------------------------PRIVATE METHOD-------------------------------*/
    
        /**
         * 递归扫描文件信息
         *
         * @param rootDir
         */
        private void scanFiles(File rootDir, DataNodeInfo dataNodeInfo) {
            if (rootDir.isFile()) {
                String path = rootDir.getPath();
                path = path.substring(DATA_DIR.length());
                path = path.replace("\\", "/"); // /image/product/iphone.jpg
                dataNodeInfo.addFilename(path);
                dataNodeInfo.addStoredDataSize(rootDir.length());
                return;
            }
            File[] children = rootDir.listFiles();
            if (children == null || children.length == 0) {
                return;
            }
            for (File child : children) {
                scanFiles(child, dataNodeInfo);
            }
        }
    }

上述包含两部分:

  1. 当前DataNode存储的所有文件名列表;
  2. 当前DataNode存储的所有文件总大小。

这些信息最终会通过NameNodeRpcClient的RPC调用发送给Name Node:

    // NameNodeRpcClient.java
    /**
     * 向NameNode全量上报元数据信息
     */
    public void fullyReportDataNodeInfo(DataNodeInfo dataNodeInfo) {
        if (dataNodeInfo == null) {
            System.out.println("当前没有存储任何文件,不需要全量上报.....");
            return;
        }
        FullyReportRequest request = FullyReportRequest.newBuilder()
                .setIp(DATANODE_IP)
                .setHostname(DATANODE_HOSTNAME)
                .setFilenameList(JSONArray.toJSONString(dataNodeInfo.getFilenames()))
                .setStoredDataSize(dataNodeInfo.getStoredDataSize())
                .build();
        namenode.fullyReportDataNodeInfo(request);
        System.out.println("全量上报DataNode信息:" + dataNodeInfo);
    }

另外,我们还需要注意下,如果DataNode初始化过程中的那次服务注册失败了,没有完成全量信息的上报。那么,在后续的心跳过程中,如果发现NameNode侧不存在当前DataNode的信息,也会进行全量信息上报:

    /**
     * 负责心跳的线程
     */
    private class HeartbeatThread extends Thread {
        @Override
        public void run() {
            while (true) {
                try {
                    // 发送心跳
                    HeartbeatResponse response = rpcClient.heartbeat();
                    if (response.getStatus() == RENEW_FIRST) {
                        //心跳失败,节点还没注册
                        register();
                    }
                    Thread.sleep(30 * 1000);
                } catch (Exception e) {
                    System.out.println("当前NameNode不可用,心跳失败.......");
                }
            }
        }
    }

1.2 增量上报

DataNode增量信息上报,在文件上传的流程中完成:

    // DataNodeNIOServer.java
    
    /**
     * 工作线程
     */
    class Worker extends Thread {
        @Override
        public void run() {
            while (true) {
                SocketChannel channel = null;
                try {
                    //...
    
                    // 5.3 判断是否读取完毕
                    if (hasReadImageLength == filesize) {
                        ByteBuffer outBuffer = ByteBuffer.wrap("SUCCESS".getBytes());
                        channel.write(outBuffer);
                        cachedRequest.remove(remoteAddr);
                        System.out.println("文件读取完毕,返回响应给客户端: " + remoteAddr);
    
                        // 增量上报DataNode元数据
                        rpcClient.deltaReportDataNodeInfo(filename, filesize);
                        System.out.println("增量上报收到的文件副本给NameNode节点......");
                    } else {
                        InflightRequest request = new InflightRequest(filename, filesize, hasReadImageLength);
                        cachedRequest.put(remoteAddr, request);
                        key.interestOps(SelectionKey.OP_READ);
                        System.out.println("文件没有读取完毕,等待下一次OP_READ请求,缓存文件:" + request);
                    }
                } catch (Exception e) {
                    //...
                }
            }
        }
    }

最终由组件NameNodeRpcClient调用RPC接口上送本次DataNode接受到的文件信息:

    // NameNodeRpcClient.java
    
    /**
     * 向NameNode增量上报元数据信息
     */
    public void deltaReportDataNodeInfo(String filename, long filesize) {
        DeltaReportRequest request = DeltaReportRequest.newBuilder()
                .setHostname(DATANODE_HOSTNAME)
                .setIp(DATANODE_IP)
                .setFilename(filename)
                .setFilesize(filesize)
                .build();
        namenode.deltaReportDataNodeInfo(request);
        System.out.println("增量上报DataNode信息:" + filename + ", " + filesize);
    }

二、总结

本章,我对分布式文件系统的DataNode节点的信息上报功能进行了拆分讲解。至此,整个 文件存储 模块的功能就已经实现了。

阅读全文