2023-07-30  阅读(248)
原文作者:Ressmix 原文地址:https://www.tpvlog.com/article/320

本章,我将对BackupNode的启停与元数据恢复机制进行讲解,主要分为两部分内容:

  1. BackupNode的元数据恢复机制;
  2. BackupNode的优雅停机机制。

一、元数据恢复

BackupNode会定期生成fsimage快照和checkpoint信息保存到磁盘上,同时定期从NameNode拉取Edits Log日志,如果BackupNode突然宕机,内存中的元数据就会丢失。

所以,BackupNode在启动时需要查找磁盘中的fsimage快照文件和checkpoint文件,恢复内存文件目录树,从checkpoint之后开始拉取edits log日志。

元数据由FSNameSystem管理,所以需要在FSNameSystem构造时进行元数据恢复:

    public class FSNameSystem {
    
        // 负责管理内存文件目录树的组件
        private FSDirectory directory;
    
        // Checkpoint检查点
        private CheckPoint checkPoint = new CheckPoint();
    
        // 是否正在从fsimage恢复元数据
        private volatile boolean finishedRecover = false;
    
        /**
         * 恢复元数据
         */
        public void recoverNamespace() {
            try {
                // 1.加载checkpoint文件
                loadCheckpointInfo();
                // 2.加载fsimage文件
                loadFSImage();
                // 3.置恢复元数据标志
                finishedRecover = true;
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        //...
    }

来看FSNameSystem.recoverNamespace()方法,主要分为三个步骤:

  1. 加载checkpoint文件,解析checkpoint信息并保存到内存中;
  2. 加载fsimage文件,并结合checkpoint信息恢复内存文件目录树;
  3. 置恢复元数据标志为完成。

1.1 加载checkpoint文件

先来看checkpoint文件的读取,checkpoint文件中包含了最近一次执行checkpoint的时间、最大txid、fsimage文件名,FSNameSystem读取到checkpoint文件后,会将其缓存到内存中:

    /**
     * 负责管理元数据的核心组件
     */
    public class FSNameSystem {
        // Checkpoint检查点
        private CheckPoint checkPoint = new CheckPoint();
    
        /**
         * 加载checkpoint文件
         */
        private void loadCheckpointInfo() throws Exception {
            FileInputStream in = null;
            FileChannel channel = null;
            try {
                String path = "C:\\Users\\Ressmix\\Desktop\\backupnode\\checkpoint-info.meta";
                File file = new File(path);
                if (!file.exists()) {
                    System.out.println("checkpoint info文件不存在,不进行恢复.......");
                    return;
                }
    
                in = new FileInputStream(path);
                channel = in.getChannel();
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                int count = channel.read(buffer);
                buffer.flip();
    
                // 解析checkpoint
                String checkpointInfo = new String(buffer.array(), 0, count);
                long checkpointTime = Long.valueOf(checkpointInfo.split("_")[0]);
                long syncedTxid = Long.valueOf(checkpointInfo.split("_")[1]);
                String fsimageFile = checkpointInfo.split("_")[2];
                System.out.println("恢复checkpoint time:" + checkpointTime + ", synced txid: " + syncedTxid + ", fsimage file: " + fsimageFile);
    
                this.checkPoint.setCheckpointTime(checkpointTime);
                this.checkPoint.setFsimageFile(fsimageFile);
                this.checkPoint.setSyncedTxid(syncedTxid);
                directory.setMaxTxid(syncedTxid);
            } finally {
                if (in != null) {
                    in.close();
                }
                if (channel != null) {
                    channel.close();
                }
            }
        }
        //...
    }

1.2 加载fsimage文件

接着,FSNameSystem会读取fsimage文件,恢复内存文件目录树:

    /**
     * 负责管理元数据的核心组件
     */
    public class FSNameSystem {
    
        // 负责管理内存文件目录树的组件
        private FSDirectory directory;
    
        // Checkpoint检查点
        private CheckPoint checkPoint = new CheckPoint();
    
        // 是否正在从fsimage恢复元数据
        private volatile boolean finishedRecover = false;
    
        /**
         * 加载fsimage文件到内存里来进行恢复
         */
        private void loadFSImage() throws Exception {
            FileInputStream in = null;
            FileChannel channel = null;
            try {
                // 从checkpoint获取快照文件名
                String fsimage = checkPoint.getFsimageFile();
                String path = "C:\\Users\\Ressmix\\Desktop\\backupnode\\" + fsimage;
                File file = new File(path);
                if (!file.exists()) {
                    System.out.println("fsimage文件当前不存在,不进行恢复.......");
                    return;
                }
    
                in = new FileInputStream(path);
                channel = in.getChannel();
                ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
                int count = channel.read(buffer);
    
                buffer.flip();
                String fsimageJson = new String(buffer.array(), 0, count);
                System.out.println("恢复fsimage文件中的数据:" + fsimageJson);
    
                FSDirectory.INode dirTree = JSONObject.parseObject(fsimageJson, new TypeReference<FSDirectory.INode>() {
                });
                System.out.println(dirTree);
    
                // 恢复目录树
                directory.setDirTree(dirTree);
            } finally {
                if (in != null) {
                    in.close();
                }
                if (channel != null) {
                    channel.close();
                }
            }
        }
        //...
    }

1.3 置恢复元数据标志

最后,置恢复元数据标志:

    public class FSNameSystem {
    
        // 是否正在从fsimage恢复元数据
        private volatile boolean finishedRecover = false;
    
        /**
         * 恢复元数据
         */
        public void recoverNamespace() {
            try {
                loadCheckpointInfo();
                loadFSImage();
                // 置完成
                finishedRecover = true;
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

这样,当定期拉取Edits Log的线程和定期生成fsimage快照的线程在运行前就必须要先判断:如果BackipNode正在进行元数据恢复,则等待其完成。

    /**
     * edits log复制组件
     */
    public class EditsLogFetcher extends Thread {
        private FSNameSystem namesystem;
    
        @Override
        public void run() {
            System.out.println("edits log同步线程已经启动......");
    
            while (backupNode.isRunning()) {
                try {
                    // 1.如果BackipNode正在进行元数据恢复,则等待其完成
                    if (!namesystem.isFinishedRecover()) {
                        System.out.println("当前还没完成元数据恢复,不进行editlog同步......");
                        Thread.sleep(1000);
                        continue;
                    }
                    //...
                }
            }
        }
    }

二、优雅停机

BackupNode在运行时,后台有两个线程:

    public class BackupNode {
        //...
        public void start() {
            // 定期拉取Edits Log线程
            EditsLogFetcher editsLogFetcher = new EditsLogFetcher(this, namesystem, namenode);
            editsLogFetcher.start();
    
            // 定期生成fsimage快照线程
            FSImageCheckPointer checkPointer = new FSImageCheckPointer(this, namesystem, namenode);
            checkPointer.start();
        }
    }

我们需要分别对它们做一些特殊处理,防止BackupNode进程结束时它俩还在运行。

2.1 拉取Edits Log线程

EditsLogFetcher线程在运行时,需要基于BackupNode的isRunning字段判断BackupNode的状态,

    /**
     * edits log复制组件
     */
    public class EditsLogFetcher extends Thread {
        private static final Integer BACKUP_NODE_FETCH_SIZE = 10;
    
        private BackupNode backupNode;
        private NameNodeRpcClient namenode;
        private FSNameSystem namesystem;
    
        public EditsLogFetcher(BackupNode backupNode, FSNameSystem namesystem, NameNodeRpcClient namenode) {
            this.backupNode = backupNode;
            this.namenode = namenode;
            this.namesystem = namesystem;
        }
    
        @Override
        public void run() {
            System.out.println("edits log同步线程已经启动......");
    
            while (backupNode.isRunning()) {
                try {
                    // 1.如果BackipNode正在进行元数据恢复,则等待其完成
                    if (!namesystem.isFinishedRecover()) {
                        System.out.println("当前还没完成元数据恢复,不进行editlog同步......");
                        Thread.sleep(1000);
                        continue;
                    }
    
                    // 2.从上一次同步完成的txid开始进行日志拉取
                    long syncedTxid = namesystem.getSyncedTxid();
                    JSONArray editsLogs = namenode.fetchEditsLog(syncedTxid);
    
                    if (editsLogs.size() == 0) {
                        System.out.println("没有拉取到任何一条editslog,等待1秒后继续尝试拉取");
                        Thread.sleep(1000);
                        continue;
                    }
    
                    if (editsLogs.size() < BACKUP_NODE_FETCH_SIZE) {
                        Thread.sleep(1000);
                        System.out.println("拉取到的edits log不足10条数据,等待1秒后再次继续去拉取");
                    }
    
                    // 3.进行日志回放
                    for (int i = 0; i < editsLogs.size(); i++) {
                        JSONObject editsLog = editsLogs.getJSONObject(i);
                        System.out.println("拉取到一条editslog:" + editsLog.toJSONString());
                        String op = editsLog.getString("OP");
    
                        if (op.equals("MKDIR")) {
                            String path = editsLog.getString("PATH");
                            try {
                                namesystem.mkdir(editsLog.getLongValue("txid"), path);
                            } catch (Exception e) {
                                e.printStackTrace();
                            }
                        }
                    }
                    namenode.setRunning(true);
                } catch (Exception e) {
                    namenode.setRunning(false);
                }
            }
        }
    }

注意,上述有一行代码:namenode.setRunning(true);,目的是对NameNode的状态作检测,防止NameNode出现问题时,BackupNode的checkpoint机制还在不断向NameNode发送fsimage快照。

2.2 生成fsimage线程

生成fsimage线程的代码如下:

    /**
     * 生成fsimage快照文件的线程
     */
    public class FSImageCheckPointer extends Thread {
        private static final Integer CHECKPOINT_INTERVAL = 60 * 60 * 1000;
    
        private BackupNode backupNode;
        private FSNameSystem namesystem;
        private NameNodeRpcClient namenode;
    
        public FSImageCheckPointer(BackupNode backupNode, FSNameSystem namesystem, NameNodeRpcClient namenode) {
            this.backupNode = backupNode;
            this.namesystem = namesystem;
            this.namenode = namenode;
        }
    
        @Override
        public void run() {
            System.out.println("fsimage checkpoint定时调度线程启动......");
            while (backupNode.isRunning()) {
                try {
                    // 1.如果BackupNode正在进行元数据恢复,则等待其完成
                    if (!namesystem.isFinishedRecover()) {
                        System.out.println("当前还没完成元数据恢复,不进行checkpoint......");
                        Thread.sleep(1000);
                        continue;
                    }
    
                    // 2.判断是否可以进行checkpoint
                    long now = System.currentTimeMillis();
                    long checkpointTime = namesystem.getCheckPoint().getCheckpointTime();
                    if (now - checkpointTime > CHECKPOINT_INTERVAL) {
                        // 如果NameNode状态不正常,则不触发checkpoint机制
                        if (!namenode.getRunning()) {
                            System.out.println("namenode当前无法访问,不执行checkpoint......");
                            continue;
                        }
                        System.out.println("准备执行checkpoint操作......");
                        // 触发checkpoint
                        doCheckpoint();
                        System.out.println("完成checkpoint操作......");
                    }
                    Thread.sleep(1000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
        //...
    }

三、总结

本章,我对BackupNode宕机恢复机制进行了讲解,实现思路与NameNode的元数据恢复机制相似。至此,我们的分布式文件系统的元数据管理功能就基本全部实现了,主要包括了以下内容:

  • 基于纯内存的元数据信息维护;
  • editslog机制;
  • backup备份机制;
  • 元数据恢复机制。
阅读全文
  • 点赞