2023-07-30
原文作者:Ressmix 原文地址:https://www.tpvlog.com/article/320

BackupNode从NameNode复制了Edits Log后,会生成内存文件目录树,并且会定期生成一个fsimage文件快照。所谓fsimage文件快照,本质就是一个基于元数据的持久化文件:

202307302136536891.png

BackupNode每触发一次checkpoint机制,都会执行以下操作:

  1. 生成一个fsimage快照并持久化保存;
  2. 将fsimage文件发送给NameNode;
  3. 将本次checkpoint的信息发送给NameNode;
  4. 本地持久化保存checkpoint信息。

BackupNode会把checkpoint信息通过RPC方式发送给NameNode,NameNode记录最新的checkpoint信息后,就知道万一自己宕机了该读取哪个fsimage文件来恢复目录树。

本章,我就来实现分布式文件系统的checkpoint机制代码。

一、文件格式

首先我们来考虑下fsimage快照文件的格式,以及checkpoint文件的格式。

1.1 fsimage文件

fsimage文件的格式需要根据实际的业务情况来设计,比如Hadoop这种分布式存储系统,fsimage的设计是非常复杂的。但是,我们的分布式文件系统可以设计得简单一些,直接用一个JSON保存就可以了,也就是说将内存文件目录树的内容直接保存成JSON格式的文件。

我们定义一个Java Bean,来表示快照文件。注意,下面的maxTxid表示当前快照文件中的最新一条edits log对应的txid:

    public class FSImage {
        private long maxTxid;
        private String fsimageJson;
    
        public FSImage(long maxTxid, String fsimageJson) {
            this.maxTxid = maxTxid;
            this.fsimageJson = fsimageJson;
        }
        //...省略get/set
    }

很多童鞋会想,如果内存文件目录树中的内容特别多,会不会导致fsimage文件过大呢?事实上,我们可以自己估算一下内存文件目录树的大小,对于一个包含1亿个文件元数据信息的内存文件目录树,其大小也就是一两个GB。

NameNode重启后,从fsimage文件加载目录树到内存中,一般32G内存的机器就已经足够了。

如果文件确实很多,可以采用Hadoop HDFS提供的一个方案——Federation架构,其实就是把文件目录树给拆散,按照数据分片的方式存储到多个NameNode上,每个NameNode里存放一部分的目录和文件,保证每个NameNode内存里就几个GB的文件目录树。

1.2 checkpoint文件

checkpoint文件比较简单,checkpoint文件保存了BackupNode最近一次执行checkpoint机制的一些信息,包括:checkpoint时间点,fsimage中的最新edits log的txid,fsimage文件名等等。

    public class CheckPoint {
    
        // 最近一次checkpoint时间点
        private long checkpointTime = System.currentTimeMillis();
    
        // 最近一次checkpoint对应的目录树的最大txid
        private long syncedTxid = -1L;
    
        // 最近一次checkpoint中fsimage文件名
        private String fsimageFile = "";
    
        //...省略get/set
    }

我们直接用一个名为checkpoint-info.meta的文件来保存CheckPoint对象的JSON内容即可。

二、checkpoint线程

DataNode启动后,会启动一个后台线程 FSImageCheckPointer ,它需要定时触发checkpoint机制。我们设计成默认每隔1小时执行一次checkpoint机制,并且只保存最近一小时的edits log文件:

    /**
     * BackupNode启动类
     */
    public class BackupNode {
    
        private volatile Boolean isRunning = true;
        private FSNameSystem namesystem;
    
        public static void main(String[] args) {
            BackupNode backupNode = new BackupNode();
            backupNode.init();
            backupNode.start();
        }
    
        public void start() {
            //...
    
            // 定期执行checkpoint的线程
            FSImageCheckPointer checkPointer = new FSImageCheckPointer(this, namesystem, namenode);
            checkPointer.start();
        }
    
        public Boolean isRunning() {
            return isRunning;
        }
    }

我们先来看下checkpoint线程,它默认每隔60分钟会执行一次checkpoint机制:

    public class FSImageCheckPointer extends Thread {
        private static final Integer CHECKPOINT_INTERVAL = 60 * 60 * 1000;
    
        private BackupNode backupNode;
        private FSNameSystem namesystem;
        private NameNodeRpcClient namenode;
    
        public FSImageCheckPointer(BackupNode backupNode, FSNameSystem namesystem, 
                                   NameNodeRpcClient namenode) {
            this.backupNode = backupNode;
            this.namesystem = namesystem;
            this.namenode = namenode;
        }
    
        @Override
        public void run() {
            System.out.println("fsimage checkpoint定时调度线程启动......");
            while (backupNode.isRunning()) {
                try {
                    // 1.如果BackupNode正在进行元数据恢复,则等待其完成
                    if (!namesystem.isFinishedRecover()) {
                        System.out.println("当前还没完成元数据恢复,不进行checkpoint......");
                        Thread.sleep(1000);
                        continue;
                    }
    
                    // 2.判断是否可以进行checkpoint
                    long now = System.currentTimeMillis();
                    long checkpointTime = namesystem.getCheckPoint().getCheckpointTime();
                    if (now - checkpointTime > CHECKPOINT_INTERVAL) {
                        if (!namenode.getRunning()) {
                            System.out.println("namenode当前无法访问,不执行checkpoint......");
                            continue;
                        }
                        System.out.println("准备执行checkpoint操作......");
                        // 触发checkpoint
                        doCheckpoint();
                        System.out.println("完成checkpoint操作......");
                    }
                    Thread.sleep(1000);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    
        /**
         * 执行checkpoint机制
         */
        private void doCheckpoint() throws Exception {
            // 1.生成fsimage
            FSImage fsimage = namesystem.getFSImage();
            // 2.删除上一次的fsimage文件
            removeLastFSImageFile();
            // 3.保存fsimage文件到磁盘
            writeFSImageFile(fsimage);
            // 4.将fsimage文件发送给NameNode
            uploadFSImageFile(fsimage);
            // 5.将checkpoint信息发送给NameNode
            updateCheckpointTxid(fsimage);
            // 6.持久化checkpoint信息
            saveCheckpointInfo(fsimage);
        }
        //...
    }

FSNameSystem保存了最近的一次checkpoint的信息,在BackupNode启动时会执行元数据恢复,这些我后续章节再讲解。我们重点来看doCheckpoint中的六大步骤:

202307302136548752.png

2.1 生成fsimage

FSNameSystem 组件会基于内存文件目录树生成FSImage文件的内容,本质就是一个大的JSON串:

    /**
     * 负责管理元数据的核心组件
     *
     * @author Ressmix
     */
    public class FSNameSystem {
    
        // 负责管理内存文件目录树的组件
        private FSDirectory directory;
    
        // Checkpoint检查点
        private CheckPoint checkPoint = new CheckPoint();
    
        // 是否正在从fsimage恢复元数据
        private volatile boolean finishedRecover = false;
    
        public FSNameSystem() {
            this.directory = new FSDirectory();
            // 从fsimage恢复元数据
            recoverNamespace();
        }
    
        /**
         * 获取文件目录树的json
         */
        public FSImage getFSImage() {
            return directory.getFSImage();
        }
        //...
    }

来看FSDirectory.getFSImage()的实现,需要注意的是FSDirectory利用了一个 读写锁 来控制并发访问,maxTxid用于记录最新的edits log的日志ID:

    public class FSDirectory {
        private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
        private long maxTxid = 0;
    
        public void mkdir(Long txid, String path) {
            try {
                writeLock();
                maxTxid = txid;
                //...
            } finally {
                writeUnlock();
            }
        }
    
        /**
         * 以json格式获取到fsimage内存元数据
         */
        public FSImage getFSImage() {
            FSImage fsimage = null;
            try {
                readLock();
                String fsimageJson = JSONObject.toJSONString(dirTree);
                long lastedTxid = this.maxTxid;
                fsimage = new FSImage(lastedTxid, fsimageJson);
            } finally {
                readUnlock();
            }
            return fsimage;
        }
    }

2.2 清理fsimage

checkpoint线程需要清理上一次执行checkpoint生成的fsimage文件:

    // FSImageCheckPointer.java
    
    /**
     * 删除上一个fsimage磁盘文件
     */
    private void removeLastFSImageFile() {
        String lastFsimageFile = namesystem.getCheckPoint().getFsimageFile();
        File file = new File(lastFsimageFile);
        if (file.exists()) {
            file.delete();
        }
    }

2.3 持久化fsimage

checkpoint线程需要将最新fsimage内容保存到磁盘上,然后更新本次checkpoint的信息:

    // FSImageCheckPointer.java
    
    /**
     * 写入最新的fsimage文件
     */
    private void writeFSImageFile(FSImage fsimage) throws Exception {
        ByteBuffer buffer = ByteBuffer.wrap(fsimage.getFsimageJson().getBytes());
    
        // fsimage文件名
        String filename = "fsimage-" + fsimage.getMaxTxid() + ".meta";
        String fsimageFilePath = "C:\\Users\\Ressmix\\Desktop\\backupnode\\" + filename;
    
        RandomAccessFile file = null;
        FileOutputStream out = null;
        FileChannel channel = null;
        try {
            file = new RandomAccessFile(fsimageFilePath, "rw");
            out = new FileOutputStream(file.getFD());
            channel = out.getChannel();
            channel.write(buffer);
            channel.force(false);
        } finally {
            if (out != null) {
                out.close();
            }
            if (file != null) {
                file.close();
            }
            if (channel != null) {
                channel.close();
            }
        }
        // 更新checkpoint信息
        namesystem.getCheckPoint().setFsimageFile(filename);
        namesystem.getCheckPoint().setCheckpointTime(System.currentTimeMillis());
        namesystem.getCheckPoint().setSyncedTxid(fsimage.getMaxTxid());
    }

2.4 传输fsimage

接着,BackupNode需要将fsimage文件发送给NameNode:

    // FSImageCheckPointer.java
    
    /**
     * 发送fsimage文件
     */
    private void uploadFSImageFile(FSImage fsimage) throws Exception {
        FSImageUploader fsimageUploader = new FSImageUploader(fsimage);
        fsimageUploader.start();
    }

传输fsimage在实现上相对复杂一些,我完全基于Java NIO来实现,并且和NameNode的宕机恢复相关,所以我放到下一章《fsimage传输与宕机恢复》专门讲解。

2.5 发送checkpoint信息

随后,BackupNode还需要将本次checkpoint信息以RPC方式发送给NameNode,这样NameNode接收到fsimage文件后,才知道选择哪个fsimage文件进行恢复:

    // FSImageCheckPointer.java
    
    /**
     * 发送checkpoint信息
     */
    private void updateCheckpointTxid(FSImage fsimage) {
        namenode.updateCheckpointTxid(fsimage.getMaxTxid());
    }

我们来看下BackupNode作为RPC Client的实现,其实就是将checkpoint中的txid发送了出去:

    public class NameNodeRpcClient {
    
        private static final String NAMENODE_HOSTNAME = "localhost";
        private static final Integer NAMENODE_PORT = 50070;
    
        private volatile Boolean isRunning = true;
    
        private NameNodeServiceGrpc.NameNodeServiceBlockingStub namenode;
    
        public NameNodeRpcClient() {
            ManagedChannel channel = NettyChannelBuilder.forAddress(NAMENODE_HOSTNAME, NAMENODE_PORT)
                    .negotiationType(NegotiationType.PLAINTEXT).build();
            this.namenode = NameNodeServiceGrpc.newBlockingStub(channel);
        }
    
        /**
         * 发送checkpoint信息
         */
        public void updateCheckpointTxid(long txid) {
            UpdateCheckpointTxidRequest request = UpdateCheckpointTxidRequest.newBuilder()
                .setTxid(txid).build();
            namenode.updateCheckpointTxid(request);
        }
        //...
    }

再来看下NameNode作为RPC Server的实现:

    public class NameNodeServiceImpl extends NameNodeServiceGrpc.NameNodeServiceImplBase {
    
        public static final Integer STATUS_SUCCESS = 1;
        public static final Integer STATUS_FAILURE = 2;
        public static final Integer STATUS_SHUTDOWN = 3;
    
        // 负责管理元数据的核心组件
        private FSNameSystem namesystem;
    
        public NameNodeServiceImpl(FSNameSystem namesystem, DataNodeManager datanodeManager,
                                   EditLogReplicator replicator) {
            this.namesystem = namesystem;
            this.datanodeManager = datanodeManager;
            this.replicator = replicator;
        }
    
        /**
         * 更新checkpoint信息
         */
        @Override
        public void updateCheckpointTxid(UpdateCheckpointTxidRequest request,
                                         StreamObserver<UpdateCheckpointTxidResponse> responseObserver) {
            long txid = request.getTxid();
    
            // 保存checkpoint中的txid信息
            namesystem.setCheckpointTxid(txid);
    
            UpdateCheckpointTxidResponse response = UpdateCheckpointTxidResponse.newBuilder()
                .setStatus(1).build();
            responseObserver.onNext(response);
            responseObserver.onCompleted();
        }
    
        /**
         * 优雅停机
         */
        @Override
        public void shutdown(ShutdownRequest request, StreamObserver<ShutdownResponse> responseObserver) {
            isRunning = false;
            namesystem.flush();
            namesystem.saveCheckpointTxid();
            System.out.println("优雅关闭namenode......");
        }
    
        //...
    }

NameNode接受到RPC请求后,首先在内存中更新checkpoint信息,当NameNode停机时,会将checkpoint信息持久化到磁盘上。这样,当NameNode宕机恢复启动时,就可以读取checkpoint信息和fsimage文件恢复元数据:

    /**
     * 负责管理元数据的核心组件
     */
    public class FSNameSystem {
    
        // 最近一次checkpoint更新到的txid
        private long checkpointTxid = 0L;
    
        public FSNameSystem() {
            this.directory = new FSDirectory();
            this.editlog = new FSEditlog();
            // 基于fsimage宕机恢复元数据
            recoverNamespace();
        }
    
        public void setCheckpointTxid(long txid) {
            System.out.println("接收到checkpoint txid:" + txid);
            this.checkpointTxid = txid;
        }
    
        public long getCheckpointTxid() {
            return checkpointTxid;
        }
    
        /**
         * 将checkpoint txid保存到磁盘上去
         */
        public void saveCheckpointTxid() {
            String path = "C:\\Users\\Ressmix\\Desktop\\editslog\\checkpoint-txid.meta";
    
            RandomAccessFile raf = null;
            FileOutputStream out = null;
            FileChannel channel = null;
    
            try {
                File file = new File(path);
                if(file.exists()) {
                    file.delete();
                }
    
                ByteBuffer buffer = ByteBuffer.wrap(String.valueOf(checkpointTxid).getBytes());
                raf = new RandomAccessFile(path, "rw");
                out = new FileOutputStream(raf.getFD());
                channel = out.getChannel();
    
                channel.write(buffer);
                channel.force(false);
            } catch(Exception e) {
                e.printStackTrace();
            } finally {
                try {
                    if(out != null) {
                        out.close();
                    }
                    if(raf != null) {
                        raf.close();
                    }
                    if(channel != null) {
                        channel.close();
                    }
                } catch (Exception e2) {
                    e2.printStackTrace();
                }
            }
        }    
        //...
    }

2.6 持久化checkpoint信息

BackupNode将checkpoint信息发送给NameNode后,最后一步是持久化保存本次checkpoint信息:

    // FSImageCheckPointer.java
    
    /**
     * 持久化checkpoint信息
     */
    private void saveCheckpointInfo(FSImage fsimage) {
        String path = "C:\\Users\\Ressmix\\Desktop\\backupnode\\checkpoint-info.meta";
    
        RandomAccessFile raf = null;
        FileOutputStream out = null;
        FileChannel channel = null;
    
        try {
            File file = new File(path);
            if (file.exists()) {
                file.delete();
            }
    
            long time = namesystem.getCheckPoint().getCheckpointTime();
            long checkpointTxid = namesystem.getCheckPoint().getSyncedTxid();
            String lastFsimageFile = namesystem.getCheckPoint().getFsimageFile();
            ByteBuffer buffer = ByteBuffer.wrap(String.valueOf(time + "_" + checkpointTxid + "_" + lastFsimageFile).getBytes());
    
            raf = new RandomAccessFile(path, "rw");
            out = new FileOutputStream(raf.getFD());
            channel = out.getChannel();
            channel.write(buffer);
            channel.force(false);
            System.out.println("checkpoint信息持久化到磁盘文件......");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (out != null) {
                    out.close();
                }
                if (raf != null) {
                    raf.close();
                }
                if (channel != null) {
                    channel.close();
                }
            } catch (Exception e2) {
                e2.printStackTrace();
            }
        }
    }

三、总结

本章,我BackupNode的checkpoint机制进行了深入详尽的讲解。BackupNode主要依赖一个后台线程定期执行checkpoint机制。下一章,我将带大家实现遗留的fsimage文件传输与namenode的宕机恢复机制。

阅读全文