2023-07-30
原文作者:Ressmix 原文地址:https://www.tpvlog.com/article/320

本章开始,我将对分布式文件系统的高可用架构进行讲解。我们的分布式文件系统,主要考虑两个地方的高可用:

  • NameNode高可用 :NameNode负责管理整个分布式文件系统的元数据,我们使用BackupNode作为它的冷备节点。当NameNode故障时,虽然可以通过edits log和fsimage快照机制保证元数据不会丢失,但是无法实现故障的自动转移,需要手动进行故障节点切换;
  • DataNode高可用 :DataNode负责文件的实际存储,我们采用了双副本机制来保障DataNode的高可用,但是如果一个DataNode节点挂掉,可能会导致一批文件只有1个副本。另外,如果DataNode在文件传输过程中出现故障,也可能导致数据丢失。

高可用相关代码,存放在:https://gitee.com/ressmix/source-code/tree/master/5.dfs/3.HA。

一、概述

1.1 NameNode高可用

对于NameNode高可用,我已经采用冷备BackupNode节点实现了,如果需要进行故障转移,我们一般需要执行以下步骤:

  1. 从BackupNode获取一份最新的fsimage,放到NameNode机器所在的指定目录;
  2. 启动NameNode进程,从fsimage加载元数据。

显然,上述这种方式十分傻瓜,只能说在一些中小公司内部使用。要真正做到NameNode的故障自动转移,我们一般需要引入Zookeeper这类分布式协调中间件,当NameNode启动后,自动将节点信息注册到Zookeeper中,其它的BackupNode和DataNode等统一从Zookeeper感知NameNode节点的变化。

本系列,我就不带大家去实现NameNode的故障自动转移了,因为涉及分布式注册中心的很多概念和底层原理的讲解。后续,我会开一个《透彻理解分布式注册中心》专栏,对Zookeeper的底层原理进行讲解,并自己动手实现一个注册中心。

1.2 DataNode高可用

DataNode高可用,在我们的 双副本 架构下,主要需要考虑以下三点:

  1. 如果DFS客户端在传输文件过程中,DataNode挂了,客户端该如何处理?
  2. DataNode宕机时,NameNode如何将它上面的文件分配给其它正常的DataNode节点?
  3. DataNode恢复后,NameNode如何让该节点删除上面的副本?

第一个问题属于 中断异常处理 ,通常处理方式是,客户端重新向NameNode请求获取一个可用的DataNode,然后再次进行传输,我将在下一章讲解具体的处理流程。

第二、三个问题的本质是 文件副本重分配 ,本章我就对该问题的处理进行讲解。

二、文件副本重分配

我们先来看下副本重分配的整体处理流程:

  1. 首先,NameNode需要在内存中记录每个DataNode包含哪些文件,这一过程可以通过增量/全量信息上报完成;
  2. 接着,NameNode在检测DataNode的心跳时,如果发现某个DataNode失活了,就需要删除内存中该节点的元数据,同时对该节点上的每个文件都创建一个副本分配任务,任务包含源DataNode、目标DataNode、文件元数据等信息;
  3. 然后,NameNode将副本分配任务扔到内存队列中,当各个DataNode发送心跳时,会获取目标节点为自身的任务,然后执行文件复制,复制完成后进行一次增量信息上报;
  4. 最后,当那个宕机的DataNode恢复后,会向NameNode进行注册并全量上报自身信息,此时NameNode发现它的某个文件已经存在于其它的两个副本中了,就要求该DataNode删除自身的副本文件。

202307302137278721.png

接着,我们来实现上述重分配的代码。

2.1 创建重分配任务

首先,我们需要改造下NameNode的心跳检测线程——DataNodeAliveMonitor。当DataNodeAliveMonitor检测有DataNode失活后,需要进行以下处理:

  1. 遍历该DataNode上的所有文件,为每个文件创建一个副本复制任务;

    • 首先,确认执行复制任务的源数据节点;
    • 接着,确认执行复制任务的目标数据节点;
    • 最后,将副本复制任务扔到目标数据节点内部的一个队列中缓存。
  2. 从DataNode集群元数据中删除该失活DataNode节点。

    // DataNodeAliveMonitor.java
    
    private class DataNodeAliveMonitor extends Thread {
    @Override
    public void run() {
        try {
            while (true) {
                List<DataNodeInfo> toRemoveDatanodes = new ArrayList<>();
                Iterator<DataNodeInfo> datanodesIterator = datanodes.values().iterator();
                DataNodeInfo datanode = null;
                while (datanodesIterator.hasNext()) {
                    datanode = datanodesIterator.next();
                    // 遍历保存的DataNode节点,如果超过90秒未上送心跳,则移除
                    if (System.currentTimeMillis() - datanode.getLatestHeartbeatTime() > 90 * 1000) {
                        toRemoveDatanodes.add(datanode);
                    }
                }
                if (!toRemoveDatanodes.isEmpty()) {
                    for (DataNodeInfo toRemoveDatanode : toRemoveDatanodes) {
                        System.out.println("数据节点【" + toRemoveDatanode + "】宕机,需要进行副本复制......");
                        // 1.创建一个副本复制任务
                        createLostReplicaTask(toRemoveDatanode);
                        // 2.从DataNode集群元数据中删除该节点
                        removeDeadDataNode(toRemoveDatanode);
                    }
                }
                // 每隔30秒检测一次
                Thread.sleep(30 * 1000);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

副本复制任务的创建,关键是要确定好源DataNode和目标DataNode:

    // DataNodeAliveMonitor.java
    
    private void createLostReplicaTask(DataNodeInfo deadDataNode) {
        List<String> files = fileMappedByDataNode.get(deadDataNode.getId());
        // 为每个文件创建一个副本复制任务
        for (String file : files) {
            String filename = file.split("_")[0];
            Long filesize = Long.valueOf(file.split("_")[1]);
            // 获取复制任务的源数据节点
            DataNodeInfo sourceDatanode = getReplicateSource(filename, deadDataNode);
            // 获取复制任务的目标数据节点
            DataNodeInfo destDatanode = getReplicateTarget(filesize, sourceDatanode, deadDataNode);
            // 创建复制任务
            ReplicateTask replicateTask = new ReplicateTask(filename, filesize, sourceDatanode,
                                                            destDatanode);
            // 将复制任务放到目标数据节点的任务队列里去
            destDatanode.addReplicateTask(replicateTask);
            System.out.println("为目标数据节点生成一个副本复制任务," + replicateTask);
        }
    }
    
    /**
     * 获取复制任务的源数据节点,即挂掉的DataNode的存活镜像节点
     */
    public DataNodeInfo getReplicateSource(String filename, DataNodeInfo deadDatanode) {
        DataNodeInfo replicateSource = null;
        try {
            rrw.readLock().lock();
            List<DataNodeInfo> replicas = datanodeMappedByFile.get(filename);
            for (DataNodeInfo replica : replicas) {
                if (!replica.equals(deadDatanode)) {
                    replicateSource = replica;
                    break;
                }
            }
        } finally {
            rrw.readLock().unlock();
        }
        return replicateSource;
    }
    
    /**
     * 获取复制任务的目标节点
     */
    private DataNodeInfo getReplicateTarget(Long filesize, DataNodeInfo sourceDatanode, DataNodeInfo deadDataNode) {
        synchronized (this) {
            DataNodeInfo targetDataNode = null;
            for (DataNodeInfo datanode : datanodes.values()) {
                // 目标节点不能是源节点,也不能是挂掉的节点
                if (!datanode.equals(sourceDatanode) && !datanode.equals(deadDataNode)) {
                    targetDataNode = datanode;
                    break;
                }
            }
            if (targetDataNode != null) {
                targetDataNode.addStoredDataSize(filesize);
            }
            return targetDataNode;
        }
    }

我们之前讲过,每个文件都会有两个副本,分别保存在两个DataNode节点上。那么当一个DataNode节点宕机后,就要选一个剩余的存活DataNode节点作为源节点。而目标节点的选取,只要保证不是源节点和挂掉的节点即可。

最后,重分配任务会缓存到目前节点的一个内存队列中:

    // 重分配任务
    public class ReplicateTask {
        private String filename;
        private Long fileLength;
        private DataNodeInfo sourceDatanode;
        private DataNodeInfo destDatanode;
        //...
    }
    public class DataNodeInfo implements Comparable<DataNodeInfo> {
        /**
         * 副本复制任务队列
         */
        private ConcurrentLinkedQueue<ReplicateTask> replicateTaskQueue = new ConcurrentLinkedQueue<>();
    
        public void addReplicateTask(ReplicateTask replicateTask) {
            replicateTaskQueue.offer(replicateTask);
        }
    
        public ReplicateTask pollReplicateTask() {
            if(!replicateTaskQueue.isEmpty()) {
                return replicateTaskQueue.poll();
            }
            return null;
        } 
        //...
    }

2.2 重分配任务下发

接着,我们来看副本重分配任务的下发过程。当DataNode向NameNode发送心跳时,NameNode会查看是否存在以当前DataNode作为目标节点的 副本分配任务 ,如果有,就通过心跳响应任务信息:

    // NameNodeServiceImpl.java
    
    /**
     * DataNode心跳
     */
    @Override
    public void heartbeat(HeartbeatRequest request, StreamObserver<HeartbeatResponse> responseObserver) {
        // 使用DataNodeManager组件完成DataNode心跳
        boolean succ = datanodeManager.heartbeat(request.getIp(), request.getHostname(), request.getNioPort());
        HeartbeatResponse response = null;
        List<Command> cmdList = new ArrayList<>();
        if (succ) {
            // 1.判断当前目标节点是否包含副本复制任务
            DataNodeInfo datanode = datanodeManager.getDataNodeInfo(request.getIp(), request.getHostname());
            ReplicateTask replicateTask = null;
            while ((replicateTask = datanode.pollReplicateTask()) != null) {
                Command command = new Command(Command.REPLICATE);
                command.setContent(JSONObject.toJSONString(replicateTask));
                cmdList.add(command);
            }
            // 2.判断当前目标节点是否包含副本删除任务
            RemoveReplicaTask removeReplicaTask = null;
            while ((removeReplicaTask = datanode.pollRemoveReplicaTask()) != null) {
                Command removeReplicaCommand = new Command(Command.REMOVE_REPLICA);
                removeReplicaCommand.setContent(JSONObject.toJSONString(removeReplicaTask));
                cmdList.add(removeReplicaCommand);
            }
            System.out.println("接收到数据节点【" + datanode + "】的心跳,他的命令列表为:" + cmdList);
            response = HeartbeatResponse.newBuilder()
                    .setStatus(STATUS_SUCCESS)
                    .setCommands(JSONArray.toJSONString(cmdList))
                    .build();
        } else {
            // 服务注册任务
            Command regCmd = new Command(Command.REGISTER);
            // 全量上报任务
            Command fullReportCmd = new Command(Command.REPORT_COMPLETE_STORAGE_INFO);
            cmdList.add(regCmd);
            cmdList.add(fullReportCmd);
            response = HeartbeatResponse.newBuilder()
                    .setStatus(STATUS_FAILURE)
                    .setCommands(JSONArray.toJSONString(cmdList))
                    .build();
        }
        responseObserver.onNext(response);
        responseObserver.onCompleted();
    }

当DataNode收到响应后,需要根据任务信息执行副本重分配流程,即从源DataNode节点下载文件到本地保存:

    // LeaseManager.java
    
    /**
     * 负责心跳的线程
     */
    private class HeartbeatThread extends Thread {
        @Override
        public void run() {
            while (true) {
                try {
                    // 发送心跳
                    HeartbeatResponse response = rpcClient.heartbeat();
                    if (response.getStatus() == STATUS_SUCCESS) {
                        JSONArray commands = JSONArray.parseArray(response.getCommands());
                        if (commands.size() > 0) {
                            for (int i = 0; i < commands.size(); i++) {
                                JSONObject command = commands.getJSONObject(i);
                                Integer type = command.getInteger("type");
                                JSONObject task = command.getJSONObject("content");
    
                                if (type.equals(COMMAND_REPLICATE)) {
                                    replicateManager.addReplicateTask(task);
                                    System.out.println("接收副本复制任务," + command);
                                } else if (type.equals(COMMAND_REMOVE_REPLICA)) {
                                    System.out.println("接收副本删除任务," + command);
                                    String filename = task.getString("filename");
                                    String absoluteFilename = DataNodeInfo.getAbsoluteFilename(filename);
                                    File file = new File(absoluteFilename);
                                    if (file.exists()) {
                                        file.delete();
                                    }
                                }
                            }
                        }
                    }else if(response.getStatus()==STATUS_FAILURE){
                        JSONArray commands = JSONArray.parseArray(response.getCommands());
                        for(int i = 0; i < commands.size(); i++) {
                            JSONObject command = commands.getJSONObject(i);
                            Integer type = command.getInteger("type");
                            // 如果是注册的命令
                            if(type.equals(COMMAND_REGISTER)) {
                                rpcClient.register();
                            }
                            // 如果是全量上报的命令
                            else if(type.equals(COMMAND_REPORT_COMPLETE_STORAGE_INFO)) {
                                DataNodeInfo storageInfo = storageManager.getStorageInfo();
                                rpcClient.fullyReportDataNodeInfo(storageInfo);
                            }
                        }
                    }
                    Thread.sleep(30 * 1000);
                } catch (Exception e) {
                    System.out.println("当前NameNode不可用,心跳失败.......");
                }
            }
        }
    }

ReplicateManager是DataNode侧实际执行副本复制的一个管理类,它会先将任务扔到一个内存队列中,然后通过线程池并发获取任务执行:

    /**
     * 副本复制管理组件
     */
    public class ReplicateManager {
    
        public static final Integer REPLICATE_THREAD_NUM = 3;
    
        private DFSNIOClient nioClient = new DFSNIOClient();
    
        private NameNodeRpcClient namenodeRpcClient;
        /**
         * 副本复制任务队列
         */
        private ConcurrentLinkedQueue<JSONObject> replicateTaskQueue = new ConcurrentLinkedQueue<>();
    
        public ReplicateManager(NameNodeRpcClient namenodeRpcClient) {
            this.namenodeRpcClient = namenodeRpcClient;
            for (int i = 0; i < REPLICATE_THREAD_NUM; i++) {
                new ReplicateWorker().start();
            }
        }
    
        /**
         * 添加复制任务
         */
        public void addReplicateTask(JSONObject replicateTask) {
            replicateTaskQueue.offer(replicateTask);
        }
    
        /**
         * 副本复制线程
         */
        class ReplicateWorker extends Thread {
            @Override
            public void run() {
                while (true) {
                    FileOutputStream imageOut = null;
                    FileChannel imageChannel = null;
                    try {
                        JSONObject replicateTask = replicateTaskQueue.poll();
                        if (replicateTask == null) {
                            Thread.sleep(1000);
                            continue;
                        }
    
                        System.out.println("开始执行副本复制任务......");
    
                        // 解析复制任务
                        String filename = replicateTask.getString("filename");
                        Long fileLength = replicateTask.getLong("fileLength");
    
                        JSONObject sourceDatanode = replicateTask.getJSONObject("sourceDatanode");
                        String hostname = sourceDatanode.getString("hostname");
                        Integer nioPort = sourceDatanode.getInteger("nioPort");
    
                        // 跟源头数据接头通信读取图片过来
                        byte[] file = nioClient.readFile(hostname, nioPort, filename);
                        ByteBuffer fileBuffer = ByteBuffer.wrap(file);
                        System.out.println("从源头数据节点读取到图片,大小为:" + file.length + "字节");
    
                        // 根据文件的相对路径定位到绝对路径,写入本地磁盘文件中
                        String absoluteFilename = DataNodeInfo.getAbsoluteFilename(filename);
                        imageOut = new FileOutputStream(absoluteFilename);
                        imageChannel = imageOut.getChannel();
                        imageChannel.write(fileBuffer);
                        System.out.println("将图片写入本地磁盘文件,路径为:" + absoluteFilename);
    
                        // 进行增量上报
                        namenodeRpcClient.deltaReportDataNodeInfo(filename, fileLength);
                        System.out.println("向Master节点进行增量上报......");
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        //...
                    }
                }
            }
        }
    }

2.3 删除任务下发

最后,我们来看下删除任务下发的流程。当各个DataNode向NameNode进行增量或全量上报自身信息时,NameNode侧如果发现某个文件已经存在足够的副本,那么这个DataNode很可能是宕机后恢复的,所以会要求它删除自己的那个文件副本。

增量上报和全量上报的改造逻辑如下:

  1. 检查当前上报节点上送的文件信息,判断该文件是否已经存在于2个以上的DataNode中;
  2. 如果是,则创建一个RemoveReplicaTask任务;
  3. 当下次该DataNode发送心跳时,就会获取到该任务,完成本地文件的删除。
    // DataNodeManager.java
    
    
    /**
     * 增量上报
     */
    public void deltaReportDataNodeInfo(String ip, String hostname, String filename, Long fileLength) {
        try {
            rrw.writeLock().lock();
    
            List<DataNodeInfo> replicas = datanodeMappedByFile.get(filename);
            if (replicas == null) {
                replicas = new ArrayList<>();
                datanodeMappedByFile.put(filename, replicas);
            }
    
            DataNodeInfo datanode = this.getDataNodeInfo(ip, hostname);
    
            // 检查当前文件的副本数量是否超标
            if (replicas.size() == REPLICA_NUM) {
                // 减少这个节点上的存储数据量
                datanode.addStoredDataSize(-fileLength);
                // 生成文件删除任务
                RemoveReplicaTask removeReplicaTask = new RemoveReplicaTask(filename, datanode);
                datanode.addRemoveReplicaTask(removeReplicaTask);
            } else {
                // 如果副本数量未超标,才会将副本放入数据结构中
                replicas.add(datanode);
                // 维护每个数据节点拥有的文件副本
                List<String> files = fileMappedByDataNode.get(ip + "-" + hostname);
                if (files == null) {
                    files = new ArrayList<String>();
                    fileMappedByDataNode.put(ip + "-" + hostname, files);
                }
    
                files.add(filename + "_" + fileLength);
            }
            System.out.println("收到DataNode增量上报信息,当前的副本信息为:" + datanodeMappedByFile);
        } finally {
            rrw.writeLock().unlock();
        }
    }
    
    /**
     * 全量上报
     */
    public void fullyReportDataNodeInfo(String ip, String hostname, List<String> filenameList, Long storedDataSize) {
        DataNodeInfo datanode = this.getDataNodeInfo(ip, hostname);
        datanode.setStoredDataSize(storedDataSize);
        for (String filename : filenameList) {
            this.deltaReportDataNodeInfo(hostname, ip, filename, storedDataSize);
        }
    }

三、总结

本章,我对分布式文件系统的高可用架构进行了讲解,主要讲解了DataNode的高可用。DataNode在实现高可用时,最核心的就是它的文件副本重分配机制。

我们解决该问题的思路就是:NameNode对于异常DataNode节点,创建 文件副本转移任务文件副本删除任务 ,然后利用心跳,向DataNode下发这些任务,完成文件迁移和删除。

阅读全文