2023-07-30  阅读(230)
原文作者:Ressmix 原文地址:https://www.tpvlog.com/article/320

本章,我将对DataNode的另一类高可用问题进行讲解,即DFS客户端在文件传输过程中,如果出现中断或异常该如何处理?

DFS客户端与DataNode在文件传输过程中,发生中断异常。此时,DFS客户端需要向NameNode上报异常节点的信息,然后NameNode再选择可用的DataNode让DFS重新传输。

202307302137336061.png

一、文件下载重传输

客户端对传输中断异常的处理主要分为两种情况: 文件下载重传输文件上传重传输 ,我们先来看下载重传输的情况。DFS客户端下载文件出现中断时,需要做两件事情:

  1. 告诉NameNode哪个DataNode出现了问题;
  2. NameNode重新选择一个可用DataNode返回给客户端,客户端重新下载。

1.1 客户端侧

我们需要改写DFS客户端侧的获取下载DataNode的RPC接口——getDataNodeForFile,在请求中加上一个excludedDataNodeId字段,表示当前故障的DataNode节点:

    message GetDataNodeForFileRequest{
        string filename  = 1;
        string excludedDataNodeId=2;
    }
    // NameNodeRpcClient.java
    
    /**
     * 发送请求获取指定文件所在的随机DataNode节点
     */
    public String getDataNodeForFile(String filename,String excludedDataNode) {
        GetDataNodeForFileRequest request = GetDataNodeForFileRequest.newBuilder()
                .setFilename(filename)
                .setExcludedDataNodeId(excludedDataNode)    // 异常DataNode
                .build();
        GetDataNodeForFileResponse response = namenode.getDataNodeForFile(request);
        return response.getDatanode();
    }

接着,DFS客户端在下载文件时,需要对中断异常做一些处理,即向新可用的DataNode进行文件传输:

    // FileSystemImpl.java
    public byte[] download(String filename) throws Exception {
        // 1.获取待下载文件对应的可用DataNode节点
        String datanode = rpcClient.getDataNodeForFile(filename, "");
        System.out.println("NameNode分配用来下载文件的数据节点:" + datanode);
    
        // 2.解析DataNode信息
        JSONObject jsonObject = JSONObject.parseObject(datanode);
        String hostname = jsonObject.getString("hostname");
        String ip = jsonObject.getString("ip");
        Integer nioPort = jsonObject.getInteger("nioPort");
    
        // 3.基于Java NIO下载文件
        byte[] file = null;
        try {
            file = DFSNIOClient.readFile(hostname, nioPort, filename);
        } catch (Exception ex) {
            // 出现异常,重新获取一个可用DataNode,上送异常的DataNode信息
            datanode = rpcClient.getDataNodeForFile(filename, ip + "-" + hostname);
            jsonObject = JSONObject.parseObject(datanode);
            hostname = jsonObject.getString("hostname");
            nioPort = jsonObject.getInteger("nioPort");
    
            try {
                file = DFSNIOClient.readFile(hostname, nioPort, filename);
            } catch (Exception e2) {
                throw e2;
            }
        }
        return file;
    }

1.2 NameNode侧

同时,我们需要对NameNode侧做一些改造,当RPC接口getDataNodeForFile上送了异常DataNode信息时,需要剔除该节点:

    // NameNodeServiceImpl.java
    
    /**
     * 下载文件
     */
    @Override
    public void getDataNodeForFile(GetDataNodeForFileRequest request, StreamObserver<GetDataNodeForFileResponse> responseObserver) {
        String filename = request.getFilename();
        // 异常DataNode
        String excludedDataNode = request.getExcludedDataNodeId();
        DataNodeInfo datanode = datanodeManager.getDataNodeForFile(filename,excludedDataNode);
    
        GetDataNodeForFileResponse response = GetDataNodeForFileResponse.newBuilder()
                .setDatanode(JSONObject.toJSONString(datanode))
                .build();
        responseObserver.onNext(response);
        responseObserver.onCompleted();
    }

选择一个正常的DataNode节点:

    // DataNodeManager.java
    
    /**
     * 获取可供下载的DataNode节点
     */
    public DataNodeInfo getDataNodeForFile(String filename, String excludedDataNode) {
        try {
            rrw.readLock().lock();
            // 需要排除的节点
            DataNodeInfo excluded = datanodes.get(excludedDataNode);
            List<DataNodeInfo> datanodes = datanodeMappedByFile.get(filename);
            if (datanodes.size() == 1 && (datanodes.get(0).equals(excluded))) {
                return null;
            }
    
            int size = datanodes.size();
            // 随机选择一个非异常节点
            Random random = new Random(System.currentTimeMillis());
            while (true) {
                int index = random.nextInt(size);
                DataNodeInfo datanode = datanodes.get(index);
                if (!datanode.equals(excluded)) {
                    return datanode;
                }
            }
        } finally {
            rrw.readLock().lock();
        }
    }

二、文件上传重传输

文件上传中断的异常处理流程是类似的,DFS客户端上传文件出现中断时,需要做两件事情:

  1. 告诉NameNode哪个DataNode出现了问题;
  2. NameNode重新选择一个可用DataNode返回给客户端,客户端重新上传。

2.1 客户端侧

首先,我们需要改写DFS客户端侧获取上传DataNode的RPC接口——allocateDataNodes,在请求中加上一个excludedDataNodeId字段,表示当前故障的DataNode节点:

    message AllocateDataNodesRequest{
        string filename  = 1;
        int64 filesize  = 2;
        string excludedDataNodeId=3;
    }
    // NameNodeRpcClient.java
    
    /**
     * 分配双副本DataNode数据节点
     */
    public String allocateDataNodes(String filename, long fileSize,String excludedDataNodeId) {
        AllocateDataNodesRequest request = AllocateDataNodesRequest.newBuilder()
                .setFilename(filename)
                .setFilesize(fileSize)
                .setExcludedDataNodeId(excludedDataNodeId)
                .build();
        AllocateDataNodesResponse response = namenode.allocateDataNodes(request);
        return response.getStatus() == 1 ? response.getDatanodes() : null;
    }

接着,DFS客户端在上传文件时,需要对中断异常做一些处理,即上传异常节点信息,然后重新上传:

    public Boolean upload(byte[] file, String filename, long fileSize) throws Exception {
        // 1.RPC接口发送文件元数据
        if (!filename.startsWith(File.separator)) {
            filename = File.separator + filename;
        }
        if (!rpcClient.createFile(filename)) {
            return false;
        }
    
        // 2.RPC接口获取DataNode
        String datanodesJson = rpcClient.allocateDataNodes(filename, fileSize, "");
        System.out.println(datanodesJson);
        if (datanodesJson == null) {
            return false;
        }
    
        // 3.遍历DataNode,依次上传文件
        JSONArray datanodes = JSONArray.parseArray(datanodesJson);
        for (int i = 0; i < datanodes.size(); i++) {
            JSONObject datanode = datanodes.getJSONObject(i);
            String hostname = datanode.getString("hostname");
            String ip = datanode.getString("ip");
            int nioPort = datanode.getIntValue("nioPort");
            try {
                DFSNIOClient.sendFile(hostname, nioPort, file, filename, fileSize);
            } catch (Exception ex) {
                ex.printStackTrace();
                // 出现异常时,上送异常DataNode信息,并重新获取一个正常的DataNode
                String  reallocateDataNode= rpcClient.allocateDataNodes(filename, fileSize, ip + "-" + hostname);
                datanode = JSONObject.parseObject(reallocateDataNode);
                hostname = datanode.getString("hostname");
                nioPort = datanode.getIntValue("nioPort");
                DFSNIOClient.sendFile(hostname, nioPort, file, filename, fileSize);
            }
    
        }
        return true;
    }

2.2 NameNode侧

最后,需要对NameNode侧做一些改造,当RPC接口allocateDataNodes上送了异常DataNode信息时,需要剔除该节点:

    // NameNodeServiceImpl.java
    
    /**
     * 获取可供上传的DataNode节点及副本
     */
    @Override
    public void allocateDataNodes(AllocateDataNodesRequest request, StreamObserver<AllocateDataNodesResponse> responseObserver) {
        try {
            AllocateDataNodesResponse response = null;
            if (!isRunning) {
                response = AllocateDataNodesResponse.newBuilder().setStatus(STATUS_SHUTDOWN).build();
            } else {
                long fileSize = request.getFilesize();
                // 需要剔除的异常DataNode
                String excludedDataNodeId= request.getExcludedDataNodeId();
                List<DataNodeInfo> datanodes = datanodeManager.allocateDataNodes(fileSize,excludedDataNodeId);
                String datanodesJson = JSONArray.toJSONString(datanodes);
                response = AllocateDataNodesResponse.newBuilder().setStatus(STATUS_SUCCESS).setDatanodes(datanodesJson).build();
            }
            responseObserver.onNext(response);
            responseObserver.onCompleted();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

选择一个正常的DataNode节点:

    // DataNodeManager.java
    
    /**
     * 获取可供上传的DataNode节点及副本
     */
    public List<DataNodeInfo> allocateDataNodes(long fileSize, String excludedDataNodeId) {
        synchronized (this) {
            DataNodeInfo excludedDataNode = datanodes.get(excludedDataNodeId);
            List<DataNodeInfo> datanodeList = new ArrayList<>();
            for (DataNodeInfo datanode : datanodes.values()) {
                if (!datanode.equals(excludedDataNode)) {
                    datanodeList.add(datanode);
                }
            }
            Collections.sort(datanodeList);
    
            // 选择存储数据最少的头两个datanode出来
            List<DataNodeInfo> selectedDatanodes = new ArrayList<>();
            if (datanodeList.size() >= 2) {
                selectedDatanodes.add(datanodeList.get(0));
                selectedDatanodes.add(datanodeList.get(1));
                // 增加节点存储数据的大小
                datanodeList.get(0).addStoredDataSize(fileSize);
                datanodeList.get(1).addStoredDataSize(fileSize);
            } else if (datanodeList.size() == 1) {
                selectedDatanodes.add(datanodeList.get(0));
            }
            return selectedDatanodes;
        }
    }

三、总结

本章,我对DataNode高可用的另一类异常——文件传输中断问题进行了讲解,如果DFS客户端在文件传输过程中,DataNode出现了异常,那么应当重新向NameNode申请可用DataNode,并重新进行文件上传或下载。整个处理思路是比较简单的,主要是异常状况下上送异常DataNode的信息,以及NameNode对正常DataNode的选取。

阅读全文
  • 点赞