2023-07-30
原文作者:Ressmix 原文地址:https://www.tpvlog.com/article/320

从本章开始,我将对分布式文件系统的文件上传功能进行讲解。我们的分布式文件系统的大脑是NameNode,它负责管理整个集群的元数据,而真正负责数据存储的是DataNode。所以,我们在设计文件存储功能时,需要重点考虑以下几个方面的内容:

  1. 如何在DataNode集群中分布式地存储各种小文件?
  2. 如何保证DataNode的可用性?
  3. NameNode如何感知DataNode集群的信息?
  4. 客户端如何与NameNode、DataNode通信,完成文件的上传和下载?

解决了上述四个问题,我们的分布式文件系统的存储架构也就浮现出来了。本章,我先以文件上传和下载功能为例,讲解分布式文件系统的文件存储全流程,后续章节再带领大家逐步实现各个功能点。

由于文件存储这部分的内容涉及重构前面章节的存量代码,所以我重新上传了一份代码到Gitee上,文件存储部分的代码:https://gitee.com/ressmix/source-code/tree/master/5.dfs/2.storage

一、整体架构

对于分布式文件系统的数据存储来说,一般都是采用数据分散集群的架构。我们先来看 文件上传 的整体流程:

202307302137045501.png

  1. 首先,DFS客户端与NameNode通信,确认文件存储在哪些DataNode节点上;
  2. 接着,DFS客户端直接与DataNode通信,进行文件的多副本上传;
  3. 同时,DataNode会不断向NameNode上报自己的存储元数据信息,以便NameNode对DataNode节点的上下线状态进行感知以及负载均衡等。

再来看 文件下载 的整体流程:

202307302137080102.png

  1. 首先,DFS客户端与NameNode通信,确认要下载的文件分布在哪些DataNode节点上;
  2. NameNode会根据负载均衡算法,选择一个DataNode节点并返回其信息;
  3. 最后,DFS客户端通过NIO网络通信,从指定DataNode节点完成文件下载。

关于数据分散集群,我在《分布式系统从理论到实战》系列的专栏中进行过详细讲解,不熟悉的读者可以先去看一看。

1.1 元数据管理

DFS客户端会提供文件上传和下载接口,通过RPC方式与NameNode进行通信,获取需要的DataNode节点信息。对于文件文件,NameNode接受到DFS的请求后,会在内存中维护文件目录树,并写edits log日志。BackupNode同步到日志后,进行回放,并维护fsimage快照文件。所以,这套机制和我前面章节讲解的元数据管理完全相同。

1.2 高可用

DataNode负责实际的数据存储,所以不能是单点。而保证DataNode高可用的最基本思路就是 冗余 ,那么到底需要几个副本来冗余存储呢?

我们来看下Hadoop HDFS的设计,HDFS在设计节点冗余时,考虑了 机架和机房 的设计,默认使用3个副本,其中2个副本分配在同一个机架上,另外一个副本在别的机架上,这样万一某个机架故障了(机架上的所有机器都会出问题),那别的机架上还有一个副本可用。

我们的分布式文件系统不需要设计得这么复杂,我们采用 双副本模式 ,即一个文件存储两份(用两个对等DataNode节点保存),对于一些中等规模的公司来说已经足够了。

1.3 负载均衡

有了副本冗余机制后,NameNode需要在可用的DataNode节点中进行选择,并响应给客户端,那么如何选择呢?这就涉及到了负载均衡算法。

常见的负载均衡算法有很多,比如Round Robin、负载最小、权重等等。对于文件上传,我们可以根据DataNode的 存储数据量 作负载,也就是说,NameNode会从所有可用的DataNode中选出2个数据量最少的节点,返回给客户端;对于文件下载,我们直接随机选择一个DataNode节点返回。

1.4 数据流传输

DFS客户端接受到NameNode的响应后,需要与指定的DataNode通信,完成文件的实际上传/下载操作。那么,应该采用什么样的传输方式呢?对于文件下载,我们可以直接基于NIO短连接通信来实现。而对于文件上传来说,由于涉及多个副本节点,一般有两种方案:

  1. 管道数据流;
  2. 基于NIO的多副本依次上传。

Hadoop HDFS是采用的管道数据流。所谓管道数据流,就是说客户端只与一个Node通信,Node之间的数据传输由Node自己完成,这种方式可以避免客户端的网络连接过多,负载过重,同时机器之间传输数据的性能要比客户端直连更高。

我们的分布式文件系统没必要搞那么复杂,我直接基于Java NIO来实现DFS客户端与DataNode之间的网络通信。

1.5 信息上报

DataNode需要定期将自己的信息上报给NameNode,包括自己存储的文件列表、每个文件的信息等等,这样NameNode才能对DataNode的元数据进行管理,从而选择合适的节点返回给DFS客户端。

我们可以基于心跳机制实现信息上报,但是需要考虑NameNode和DataNode宕机等异常情况。因为NameNode一旦宕机,内存中保存的DataNode信息就会丢失。

所以,我们可以设计 增量上报全量上报 两种模式。

  • 全量上报:只要NameNode内存中没有DataNode的信息,那么对于DataNode上送的注册/心跳请求,都要求DataNode进行全量上报。这种情况一般有两种可能:
        - NameNode首次启动;
        - DataNode长时间未发送心跳,导致元数据被NameNode剔除。
  • 增量上报:每次DataNode处理完到DFS客户端上传的文件后,立即进行一次增量上报。

二、接口设计

根据对上述分布式文件系统的整体架构分析,我们需要设计以下几类接口:

  1. DFS客户端的文件上传/下载接口;
  2. DataNode信息上报接口;
  3. DataNode心跳和注册接口。

其中DataNode的心跳和注册接口我在前面 元数据管理 的相关章节中已经实现了,后续章节我会根据信息上报的需求对这块的代码进行改造,以支持增量/全量信息上报。

2.1 文件上传接口

文件上传接口分为三部分:

  1. createRPC接口:DFS客户端首先发送RPC请求给NameNode,上送待上传的文件信息。NameNode收到请求后,在内存中维护文件目录树,并写Edits Log日志;
  2. allocateDataNodesRPC接口:DFS客户端接受到create请求的成功响应后,发送要上传的文件元数据信息,包括文件名、路径、文件大小等。NameNode收到请求后,根据负载均衡算法返回DataNode列表;
  3. sendFileNIO文件上传接口:DFS客户端接受到DataNode列表后,基于Java NIO与各个DataNode进行通信,上传文件。

上传三个接口的交互如下图:

202307302137102103.png

2.2 文件下载接口

文件下载接口分为两部分:

  1. getDataNodeForFile接口:DFS客户端首先发送RPC请求给NameNode,上送待下载的文件信息。NameNode收到请求后,随机选择一个DataNode并返回;
  2. readFileNIO文件下载接口:DFS客户端接收到DataNode信息后,基于Java NIO与该DataNode通信,完成文件下载。

202307302137133864.png

2.3 信息上报接口

DataNode的信息上报接口有两个:

  1. deltaReportDataNodeInfo增量上报接口:DataNode接收到客户端上传的文件后,需要调用该接口上报文件信息给NameNode;
  2. fullyReportDataNodeInfo全量上报接口:DataNode在向NameNode发送服务注册和心跳请求时,会根据NameNode侧是否存在自己的元数据信息,选择是否上报节点的文件列表和数据量等信息。

2.4 注册和心跳接口

我们需要对DataNode的服务注册和心跳的逻辑进行改写,以满足信息的增量/全量上报需求:

  1. register服务注册接口:DataNode节点首次启动时,进行服务注册;
  2. heartbeat服务心跳接口:默认每隔30s,DataNode向NameNode发送一次心跳请求。

三、DataNode设计

文件存储的核心逻辑都在DataNode侧,我们的DataNode工程如下:

202307302137159875.png

本节,我初步讲解下上面各个类的功能:

  • DataNodeConfig :顾名思义,就是保存DataNode的配置;
  • DataNodeInfo :DataNode元数据的抽象,包含ip、端口、存储文件列表、存储数据大小等信息;
  • StorageManager :负责文件存储管理的组件,提供方法实时获取DataNode的元数据信息;
  • LeaseManager :负责心跳和注册管理的组件;
  • DataNodeNIOServer :基于Java NIO实现的一个Server,处理文件上传/下载的请求;
  • NameNodeRpcClient :基于gRPC的客户端实现,用于与NameNode通信;
  • DataNode :DataNode启动类。

四、RPC接口实现

我们针对需要新增的gRPC接口改写protobuf接口文件:

    syntax = "proto3";
    
    option java_multiple_files = true;
    option java_outer_classname = "NameNodeServiceProto";
    
    service NameNodeService {
        rpc register(RegisterRequest) returns (RegisterResponse){}
        rpc heartbeat(HeartbeatRequest) returns (HeartbeatResponse){}
        rpc mkdir(MkDirRequest) returns (MkDirResponse){}
        rpc shutdown(ShutdownRequest) returns (ShutdownResponse){}
        rpc fetchEditsLog(FetchEditsLogRequest) returns (FetchEditsLogResponse){}
        rpc updateCheckpointTxid(UpdateCheckpointTxidRequest) returns (UpdateCheckpointTxidResponse){}
        rpc createFile(CreateFileRequest) returns (CreateFileResponse){}
        rpc allocateDataNodes(AllocateDataNodesRequest) returns (AllocateDataNodesResponse){}
        rpc getDataNodeForFile(GetDataNodeForFileRequest) returns (GetDataNodeForFileResponse){}
        rpc fullyReportDataNodeInfo(FullyReportRequest) returns (FullyReportResponse){}
        rpc deltaReportDataNodeInfo(DeltaReportRequest) returns (DeltaReportResponse){}
    }
    
    message RegisterRequest{
        string ip  = 1;
        string hostname  = 2;
        int32 nioPort = 3;
    }
    message RegisterResponse{
        int32 status  = 1;
    }
    message HeartbeatRequest{
        string ip  = 1;
        string hostname  = 2;
        int32 nioPort = 3;
    }
    message HeartbeatResponse{
        int32 status  = 1;
    }
    message MkDirRequest{
        string path  = 1;
    }
    message MkDirResponse{
        int32 status  = 1;
    }
    message ShutdownRequest{
        int32 code  = 1;
    }
    message ShutdownResponse{
        int32 status  = 1;
    }
    message FetchEditsLogRequest{
        int64 syncedTxid  = 1;
    }
    message FetchEditsLogResponse{
        string editsLog  = 1;
    }
    message UpdateCheckpointTxidRequest{
        int64 txid  = 1;
    }
    message UpdateCheckpointTxidResponse{
        int32 status  = 1;
    }
    message CreateFileRequest{
        string filename  = 1;
    }
    message CreateFileResponse{
        int32 status  = 1;
    }
    message AllocateDataNodesRequest{
        string filename  = 1;
        int64 filesize  = 2;
    }
    message AllocateDataNodesResponse{
        int32 status  = 1;
        string datanodes = 2;
    }
    message GetDataNodeForFileRequest{
        string filename  = 1;
    }
    message GetDataNodeForFileResponse{
        int32 status  = 1;
        string datanode = 2;
    }
    message FullyReportRequest{
        string filenameList  = 1;
        int64 storedDataSize  = 2;
        string hostname = 3;
        string ip = 4;
    }
    message FullyReportResponse{
        int32 status  = 1;
    }
    message DeltaReportRequest{
        string filename  = 1;
        int64 filesize  = 2;
        string hostname = 3;
        string ip = 4;
    }
    message DeltaReportResponse{
        int32 status  = 1;
    }

接着,在dfs-client工程中,新增createFileallocateDataNodesgetDataNodeForFile接口调用实现:

    // NameNodeRpcClient.java
    
    /**
     * 创建文件
     */
    @Override
    public void createFile(CreateFileRequest request, StreamObserver<CreateFileResponse> responseObserver) {
        try {
            CreateFileResponse response = null;
            if (!isRunning) {
                response = CreateFileResponse.newBuilder().setStatus(STATUS_SHUTDOWN).build();
            } else {
                String filename = request.getFilename();
                Boolean success = namesystem.createFile(filename);
                if (success) {
                    response = CreateFileResponse.newBuilder().setStatus(STATUS_SUCCESS).build();
                } else {
                    response = CreateFileResponse.newBuilder().setStatus(STATUS_FAILURE).build();
                }
            }
            responseObserver.onNext(response);
            responseObserver.onCompleted();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    /**
     * 获取可供上传的DataNode节点及副本
     */
    @Override
    public void allocateDataNodes(AllocateDataNodesRequest request, StreamObserver<AllocateDataNodesResponse> responseObserver) {
        try {
            AllocateDataNodesResponse response = null;
            if (!isRunning) {
                response = AllocateDataNodesResponse.newBuilder().setStatus(STATUS_SHUTDOWN).build();
            } else {
                long fileSize = request.getFilesize();
                List<DataNodeInfo> datanodes = datanodeManager.allocateDataNodes(fileSize);
                String datanodesJson = JSONArray.toJSONString(datanodes);
                response = AllocateDataNodesResponse.newBuilder().setStatus(STATUS_SUCCESS).setDatanodes(datanodesJson).build();
            }
            responseObserver.onNext(response);
            responseObserver.onCompleted();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    /**
     * 下载文件
     */
    @Override
    public void getDataNodeForFile(GetDataNodeForFileRequest request, StreamObserver<GetDataNodeForFileResponse> responseObserver) {
        String filename = request.getFilename();
        DataNodeInfo datanode = datanodeManager.getDataNodeForFile(filename);
    
        GetDataNodeForFileResponse response = GetDataNodeForFileResponse.newBuilder()
                .setDatanode(JSONObject.toJSONString(datanode))
                .build();
        responseObserver.onNext(response);
        responseObserver.onCompleted();
    }

3.1 上传DataNode分配

NameNode侧对于接受到的文件上传请求,需要记录元数据,并分配可用的DataNode双副本:

    // NameNodeServiceImpl.java
    
    /**
     * 创建文件
     */
    @Override
    public void createFile(CreateFileRequest request, StreamObserver<CreateFileResponse> responseObserver) {
        try {
            CreateFileResponse response = null;
            if (!isRunning) {
                response = CreateFileResponse.newBuilder().setStatus(STATUS_SHUTDOWN).build();
            } else {
                String filename = request.getFilename();
                Boolean success = namesystem.createFile(filename);
                if (success) {
                    response = CreateFileResponse.newBuilder().setStatus(STATUS_SUCCESS).build();
                } else {
                    response = CreateFileResponse.newBuilder().setStatus(STATUS_FAILURE).build();
                }
            }
            responseObserver.onNext(response);
            responseObserver.onCompleted();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    /**
     * 获取可供上传的DataNode节点及副本
     */
    @Override
    public void allocateDataNodes(AllocateDataNodesRequest request, StreamObserver<AllocateDataNodesResponse> responseObserver) {
        try {
            AllocateDataNodesResponse response = null;
            if (!isRunning) {
                response = AllocateDataNodesResponse.newBuilder().setStatus(STATUS_SHUTDOWN).build();
            } else {
                long fileSize = request.getFilesize();
                List<DataNodeInfo> datanodes = datanodeManager.allocateDataNodes(fileSize);
                String datanodesJson = JSONArray.toJSONString(datanodes);
                response = AllocateDataNodesResponse.newBuilder().setStatus(STATUS_SUCCESS).setDatanodes(datanodesJson).build();
            }
            responseObserver.onNext(response);
            responseObserver.onCompleted();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

上述代码分别时调用了FSNameSystem.createFileDataNodeManager.allocateDataNodes完成对应的功能:

    // DataNodeManager.java
    
    /**
     * 获取可供上传的DataNode节点及副本
     */
    @Override
    public void allocateDataNodes(AllocateDataNodesRequest request, StreamObserver<AllocateDataNodesResponse> responseObserver) {
        try {
            AllocateDataNodesResponse response = null;
            if (!isRunning) {
                response = AllocateDataNodesResponse.newBuilder().setStatus(STATUS_SHUTDOWN).build();
            } else {
                long fileSize = request.getFilesize();
                List<DataNodeInfo> datanodes = datanodeManager.allocateDataNodes(fileSize);
                String datanodesJson = JSONArray.toJSONString(datanodes);
                response = AllocateDataNodesResponse.newBuilder()
                    .setStatus(STATUS_SUCCESS).setDatanodes(datanodesJson).build();
            }
            responseObserver.onNext(response);
            responseObserver.onCompleted();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

FSNameSystem.createFile属于维护文件目录树的操作,其中包含了记录edits log日志,BackupNode会从NameNode同步日志并进行回放,这块内容我就不再赘述了,读者可以自行研读代码:

    // FSNameSystem.java
    
    /**
     * 创建文件
     */
    public Boolean createFile(String filename) throws Exception {
        if (!directory.createFile(filename)) {
            return false;
        }
        editlog.logEdit(EditLogFactory.createFile(filename));
        return true;
    }

我们重点来看NameNode是如何选择供上传的DataNode节点的。这一工作由DataNodeManager组件完成,DataNodeManager在内存中维护整个DataNode集群的信息,然后根据数据存储量选择两个数据量最小的节点返回:

    public class DataNodeManager {
        // 集群中所有的DataNode信息,Key为IP-HOSTNAME
        private Map<String, DataNodeInfo> datanodes = new ConcurrentHashMap<>();
    
        /**
         * 获取可用的DataNode节点
         */
        public List<DataNodeInfo> allocateDataNodes(long fileSize) {
            synchronized (this) {
                List<DataNodeInfo> datanodeList = new ArrayList<>();
                for (DataNodeInfo datanode : datanodes.values()) {
                    datanodeList.add(datanode);
                }
                Collections.sort(datanodeList);
    
                // 选择存储数据最少的头两个datanode出来
                List<DataNodeInfo> selectedDatanodes = new ArrayList<>();
                if (datanodeList.size() >= 2) {
                    selectedDatanodes.add(datanodeList.get(0));
                    selectedDatanodes.add(datanodeList.get(1));
                    // 增加节点存储数据的大小
                    datanodeList.get(0).addStoredDataSize(fileSize);
                    datanodeList.get(1).addStoredDataSize(fileSize);
                }
    
                return selectedDatanodes;
            }
        }
        //...
    }

3.2 下载DataNode分配

再来看NameNode侧是如何处理下载文件的RPC请求的:

    // NameNodeServiceImpl.java
    
    /**
     * 下载文件
     */
    @Override
    public void getDataNodeForFile(GetDataNodeForFileRequest request, StreamObserver<GetDataNodeForFileResponse> responseObserver) {
        String filename = request.getFilename();
        DataNodeInfo datanode = datanodeManager.getDataNodeForFile(filename);
        GetDataNodeForFileResponse response = GetDataNodeForFileResponse.newBuilder()
                .setDatanode(JSONObject.toJSONString(datanode))
                .build();
        responseObserver.onNext(response);
        responseObserver.onCompleted();
    }

上述代码还是调用了DatanodeManager的接口来完成功能,这里我用了读写锁,以提升并发处理性能:

    // DatanodeManager
    
    // 文件与DataNode的映射,Key为文件名
    private Map<String, List<DataNodeInfo>> datanodeMappedByFile = new HashMap<>();
    
    /**
     * 获取可供下载的DataNode节点
     */
    public DataNodeInfo getDataNodeForFile(String filename) {
        try {
            rrw.readLock().lock();
    
            List<DataNodeInfo> datanodes = datanodeMappedByFile.get(filename);
            int size = datanodes.size();
            // 随机选择一个
            Random random = new Random(System.currentTimeMillis());
            int index = random.nextInt(size);
            return datanodes.get(index);
        } finally {
            rrw.readLock().lock();
        }
    }

3.3 服务注册和心跳

我们需要对原来的服务注册和心跳接口进行改写,增加上送DataNode的nioPort信息,所谓nioPort,就是DataNode会在启动时开启一个NIO Server线程,用于实际接受客户端文件上传/下载的数据流。

我们先看下DataNode作为客户端侧的实现:

    // NameNodeRpcClient.java
    
    private NameNodeServiceGrpc.NameNodeServiceBlockingStub namenode;
    
    /**
     * 向NameNode进行一次注册
     */
    public RegisterResponse register() {
        RegisterRequest request = RegisterRequest.newBuilder()
                .setIp(DATANODE_IP)
                .setHostname(DATANODE_HOSTNAME)
                .setNioPort(NIO_PORT)
                .build();
        RegisterResponse response = namenode.register(request);
        System.out.println("完成向NameNode的注册,响应消息为:" + response.getStatus());
    
        return response;
    }
    
    /**
     * 向NameNode发送一次心跳
     */
    public HeartbeatResponse heartbeat() {
        HeartbeatRequest request = HeartbeatRequest.newBuilder()
                .setIp(DATANODE_IP)
                .setHostname(DATANODE_HOSTNAME)
                .setNioPort(NIO_PORT)
                .build();
        return namenode.heartbeat(request);
    }

再来看NameNode侧的RPC服务实现:

    /**
     * NameNode的RPC服务接口
     */
    public class NameNodeServiceImpl extends NameNodeServiceGrpc.NameNodeServiceImplBase {
        // 成功
        public static final Integer STATUS_SUCCESS = 1;
        // 失败
        public static final Integer STATUS_FAILURE = 2;
        // 已停机
        public static final Integer STATUS_SHUTDOWN = 3;
    
        /**
         * DataNode注册
         */
        @Override
        public void register(RegisterRequest request, StreamObserver<RegisterResponse> responseObserver) {
            // 使用DataNodeManager组件完成DataNode注册
            Integer status = datanodeManager.register(request.getIp(), request.getHostname(), request.getNioPort());
            RegisterResponse response = RegisterResponse.newBuilder().setStatus(status).build();
            responseObserver.onNext(response);
            responseObserver.onCompleted();
        }
    
        /**
         * DataNode心跳
         */
        @Override
        public void heartbeat(HeartbeatRequest request, StreamObserver<HeartbeatResponse> responseObserver) {
            // 使用DataNodeManager组件完成DataNode心跳
            Integer status = datanodeManager.heartbeat(request.getIp(), request.getHostname(), request.getNioPort());
            HeartbeatResponse response = HeartbeatResponse.newBuilder().setStatus(status).build();
            responseObserver.onNext(response);
            responseObserver.onCompleted();
        }
        //...
    }

上述代码使用组件DatanodeManager完成服务注册和心跳的功能,本质就是维护内存中的DataNode信息的Hash表:

    /**
     * 负责管理集群里的所有DataNode
     */
    public class DataNodeManager {
    
        // 首次注册
        public static final Integer REGISTER_FIRST = 10000;
    
        // 已经存在且注册成功
        public static final Integer REGISTER_SUCCESS_EXIST = 10001;
    
        // 首次心跳
        public static final Integer RENEW_FIRST = 10003;
    
        // 已经存在且心跳成功
        public static final Integer RENEW_SUCCESS_EXIST = 10004;
    
        // 集群中所有的DataNode信息,Key为IP-HOSTNAME
        private Map<String, DataNodeInfo> datanodes = new ConcurrentHashMap<>();
    
        // 文件与DataNode的映射,Key为文件名
        private Map<String, List<DataNodeInfo>> datanodeMappedByFile = new HashMap<>();
    
        /**
         * datanode注册
         */
        public Integer register(String ip, String hostname, int nioPort) {
            if (containDataNode(ip, hostname)) {
                System.out.println("当前已经存在这个DataNode了......");
                return REGISTER_SUCCESS_EXIST;
            }
            setDataNodeInfo(ip, hostname, new DataNodeInfo(ip, hostname, nioPort));
            System.out.println("DataNode注册:ip=" + ip + ",hostname=" + hostname + ", nioPort=" + nioPort);
            return REGISTER_FIRST;
        }
    
        /**
         * datanode心跳
         */
        public Integer heartbeat(String ip, String hostname, int nioPort) {
            DataNodeInfo datanode = getDataNodeInfo(ip, hostname);
            if (datanode == null) {
                return RENEW_FIRST;
            }
    
            datanode.setLatestHeartbeatTime(System.currentTimeMillis());
            System.out.println("DataNode发送心跳:ip=" + ip + ",hostname=" + hostname);
            return RENEW_SUCCESS_EXIST;
        }
    
        public DataNodeInfo getDataNodeInfo(String ip, String hostname) {
            return datanodes.get(ip + "-" + hostname);
        }
    
        public void setDataNodeInfo(String ip, String hostname, DataNodeInfo dataNodeInfo) {
            datanodes.put(ip + "-" + hostname, dataNodeInfo);
        }
    
        public Boolean containDataNode(String ip, String hostname) {
            return datanodes.containsKey(ip + "-" + hostname);
        }
        //...
    }

3.4 信息上报

DataNode的信息上报分为 全量上报增量上报 ,这部分功能我放到后面章节与DataNode一起统一讲解。本章,我们就简单看一下DataNode侧的gRPC调用实现:

    /**
     * 与NameNode进行RPC通信的组件
     */
    public class NameNodeRpcClient {
    
        private NameNodeServiceGrpc.NameNodeServiceBlockingStub namenode;
    
        public NameNodeRpcClient() {
            ManagedChannel channel = NettyChannelBuilder
                    .forAddress(NAMENODE_HOSTNAME, NAMENODE_PORT)
                    .negotiationType(NegotiationType.PLAINTEXT)
                    .build();
            this.namenode = NameNodeServiceGrpc.newBlockingStub(channel);
        }
    
        /**
         * 向NameNode全量上报元数据信息
         */
        public void fullyReportDataNodeInfo(DataNodeInfo dataNodeInfo) {
            if (dataNodeInfo == null) {
                System.out.println("当前没有存储任何文件,不需要全量上报.....");
                return;
            }
            FullyReportRequest request = FullyReportRequest.newBuilder()
                    .setIp(DATANODE_IP)
                    .setHostname(DATANODE_HOSTNAME)
                    .setFilenameList(JSONArray.toJSONString(dataNodeInfo.getFilenames()))
                    .setStoredDataSize(dataNodeInfo.getStoredDataSize())
                    .build();
            namenode.fullyReportDataNodeInfo(request);
            System.out.println("全量上报DataNode信息:" + dataNodeInfo);
        }
    
        /**
         * 向NameNode增量上报元数据信息
         */
        public void deltaReportDataNodeInfo(String filename, long filesize) {
            DeltaReportRequest request = DeltaReportRequest.newBuilder()
                    .setHostname(DATANODE_HOSTNAME)
                    .setIp(DATANODE_IP)
                    .setFilename(filename)
                    .setFilesize(filesize)
                    .build();
            namenode.deltaReportDataNodeInfo(request);
            System.out.println("增量上报DataNode信息:" + filename + ", " + filesize);
        }
        //...
    }

五、总结

本章,我对分布式文件系统的存储架构进行了讲解。我们在设计文件上传和下载功能时,需要考虑元数据管理、DataNode的可用性、数据传输方式、DataNode节点选取策略等各方面的内容。

下一章,我将对DataNode如何基于Java NIO完成文件的上传和下载进行讲解。

阅读全文