2023-07-30  阅读(278)
原文作者:Ressmix 原文地址:https://www.tpvlog.com/article/320

上一章,我对BackupNode的checkpoint机制进行了深入剖析,还遗留了fsimage快照传输没有讲解。BackupNode在生成完fsimage快照文件后,需要基于NIO网络通信将fsimage文件传输给NameNode。

关于NIO的网络通信机制,我在《透彻理解Kafka》系列中深入讲解过,本节我就来仿照Kafka的通信组件实现fsimage快照传输的BackupNode客户端/NameNode服务端的代码。

一、快照传输

首先,fsimage快照传输是通过BackupNode的 FSImageUploader 组件来完成,FSImageCheckPointer在执行checkpoint机制流程的第四步会将fsimage发送给NameNode:

    /**
     * 生成fsimage快照文件的线程
     */
    public class FSImageCheckPointer extends Thread {
        /**
         * 执行checkpoint机制
         */
        private void doCheckpoint() throws Exception {
            // 1.生成fsimage
            FSImage fsimage = namesystem.getFSImage();
            // 2.删除上一次的fsimage文件
            removeLastFSImageFile();
            // 3.保存fsimage文件到磁盘
            writeFSImageFile(fsimage);
            // 4.将fsimage文件发送给NameNode
            uploadFSImageFile(fsimage);
            // 5.将checkpoint信息发送给NameNode
            updateCheckpointTxid(fsimage);
            // 6.持久化checkpoint信息
            saveCheckpointInfo(fsimage);
        }
    
        /**
         * 发送fsimage文件
         */
        private void uploadFSImageFile(FSImage fsimage)  {
            FSImageUploader fsimageUploader = new FSImageUploader(fsimage);
            fsimageUploader.start();
        }
        //...
    }

1.1 Client端

我们采用Java NIO实现BackupNode作为Client端的代码,核心流程就是:

  1. 创建一个SocketChannel,与Server端的9000端口建立连接;
  2. 创建一个Selector,并注册到SocketChannel中;
  3. 监听channel上的各类事件,比如连接建立事件、可读事件等等,并进行相应的处理。
    /**
     * fsimage快照传输线程
     *
     * @author Ressmix
     */
    public class FSImageUploader extends Thread {
    
        private FSImage fsimage;
    
        public FSImageUploader(FSImage fsimage) {
            this.fsimage = fsimage;
        }
    
        @Override
        public void run() {
            SocketChannel channel = null;
            Selector selector = null;
            try {
                channel = SocketChannel.open();
                channel.configureBlocking(false);
                // 这里指定NameNode的地址
                channel.connect(new InetSocketAddress("localhost", 9000));
                selector = Selector.open();
                channel.register(selector, SelectionKey.OP_CONNECT);
    
                boolean uploading = true;
                while (uploading) {
                    selector.select();
                    Iterator<SelectionKey> keysIterator = selector.selectedKeys().iterator();
                    while (keysIterator.hasNext()) {
                        SelectionKey key = (SelectionKey) keysIterator.next();
                        keysIterator.remove();
    
                        // 1.连接事件
                        if (key.isConnectable()) {
                            channel = (SocketChannel) key.channel();
                            if (channel.isConnectionPending()) {
                                channel.finishConnect();
                                // 上传fsimage
                                ByteBuffer buffer = ByteBuffer.wrap(fsimage.getFsimageJson().getBytes());
                                System.out.println("准备上传fsimage文件数据,大小为:" + buffer.capacity());
                                channel.write(buffer);
                            }
                            channel.register(selector, SelectionKey.OP_READ);
                        }
                        // 2.可读事件,即接收到了响应
                        else if (key.isReadable()) {
                            ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
                            channel = (SocketChannel) key.channel();
                            int count = channel.read(buffer);
    
                            if (count > 0) {
                                System.out.println("上传fsimage文件成功,响应消息为:" +
                                        new String(buffer.array(), 0, count));
                                channel.close();
                                uploading = false;
                            }
                        }
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                //...省略关闭I/O相关代码
            }
        }
    }

可以看到,Java NIO的代码大部分都是一些 Boilerplate Code ,也就是说会遵照一定的模板,我们只需在相应的位置填充业务逻辑即可:

  1. 当BackupNode监听到channel上的连接事件后,就需要发送fsimage文件内容;
  2. 当BackupNode监听到channel上的可读事件后,就需要读取NameNode的响应。

1.2 Server端

NameNode启动后,需要基于Java NIO,作为Server端提供长连接服务,我通过组件 FSImageUploadServer 组件来实现:

    /**
     * NameNode启动类
     */
    public class NameNode {
    
        // fsimage同步组件
        private FSImageUploadServer fsimageUploadServer;
    
        public static void main(String[] args) throws Exception {
            NameNode namenode = new NameNode();
            namenode.init();
            namenode.start();
        }
    
        private void init() {
            this.fsimageUploadServer = new FSImageUploadServer();
        }
    
        private void start() throws Exception {
            this.fsimageUploadServer.start();
        }
    }

FSImageUploadServer首先创建了一个ServerSocketChannel对象,绑定到本地的9000端口,然后注册Selector并监听OP_ACCEPT事件。FSImageUploadServer线程启动后,当监听到Channel上有各类事件发生时,会调用handleRequest方法进行处理:

    /**
     * fsimage server
     */
    public class FSImageUploadServer extends Thread {
    
        private Selector selector;
    
        public FSImageUploadServer() {
            this.init();
        }
    
        private void init() {
            ServerSocketChannel serverSocketChannel = null;
            try {
                selector = Selector.open();
                serverSocketChannel = ServerSocketChannel.open();
                serverSocketChannel.configureBlocking(false);
                serverSocketChannel.socket().bind(new InetSocketAddress(9000), 100);
                serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public void run() {
            System.out.println("FSImageUploadServer启动,监听9000端口......");
    
            while (true) {
                try {
                    selector.select();
                    // 监听到事件后,根据事件类型处理
                    Iterator<SelectionKey> keysIterator = selector.selectedKeys().iterator();
                    while (keysIterator.hasNext()) {
                        SelectionKey key = (SelectionKey) keysIterator.next();
                        keysIterator.remove();
                        try {
                            handleRequest(key);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                    }
                } catch (Throwable t) {
                    t.printStackTrace();
                }
            }
        }
        //...
    }

我们来看下FSImageUploadServer.handleRequest()方法,主要就是针对三种不同的事件类型作相应的处理,事件的处理流转有点类似状态机:

202307302136583841.png

    // FSImageUploadServer.java
    
    private void handleRequest(SelectionKey key) throws IOException {
        // 1.建立连接
        if (key.isAcceptable()) {
            handleConnectRequest(key);
        }
        // 2.接收到请求
        else if (key.isReadable()) {
            handleReadableRequest(key);
        }
        // 3.发送响应
        else if (key.isWritable()) {
            handleWritableRequest(key);
        }
    }
    
    /**
     * 处理BackupNode连接请求
     */
    private void handleConnectRequest(SelectionKey key) throws IOException {
        SocketChannel channel = null;
        try {
            ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
            channel = serverSocketChannel.accept();
            if (channel != null) {
                channel.configureBlocking(false);
                channel.register(selector, SelectionKey.OP_READ);
            }
        } catch (Exception e) {
            e.printStackTrace();
            if (channel != null) {
                channel.close();
            }
        }
    }
    
    /**
     * 处理发送fsimage文件的请求
     */
    private void handleReadableRequest(SelectionKey key) throws IOException {
        SocketChannel socketChannel = null;
        try {
            String fsimageFile = "C:\\Users\\Ressmix\\Desktop\\editslog\\fsimage.meta";
            RandomAccessFile raf = null;
            FileOutputStream os = null;
            FileChannel fileChannel = null;
    
            try {
                socketChannel = (SocketChannel) key.channel();
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                int total = 0;
                int count = -1;
                if ((count = socketChannel.read(buffer)) > 0) {
                    File file = new File(fsimageFile);
                    if (file.exists()) {
                        file.delete();
                    }
    
                    raf = new RandomAccessFile(fsimageFile, "rw");
                    os = new FileOutputStream(raf.getFD());
                    fileChannel = os.getChannel();
    
                    total += count;
                    buffer.flip();
                    fileChannel.write(buffer);
                    buffer.clear();
                } else {
                    socketChannel.close();
                }
    
                while ((count = socketChannel.read(buffer)) > 0) {
                    total += count;
                    buffer.flip();
                    fileChannel.write(buffer);
                    buffer.clear();
                }
    
                if (total > 0) {
                    System.out.println("接收fsimage文件以及写入本地磁盘完毕......");
                    fileChannel.force(false);
                    socketChannel.register(selector, SelectionKey.OP_WRITE);
                }
            } finally {
                if (os != null) {
                    os.close();
                }
                if (raf != null) {
                    raf.close();
                }
                if (fileChannel != null) {
                    fileChannel.close();
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
            if (socketChannel != null) {
                socketChannel.close();
            }
        }
    }
    
    /**
     * 处理返回响应给BackupNode
     */
    private void handleWritableRequest(SelectionKey key) throws IOException {
        SocketChannel channel = null;
        try {
            ByteBuffer buffer = ByteBuffer.allocate(1024);
            buffer.put("SUCCESS".getBytes());
            buffer.flip();
            channel = (SocketChannel) key.channel();
            channel.write(buffer);
            System.out.println("fsimage上传完毕,返回响应SUCCESS给backupnode......");
            channel.register(selector, SelectionKey.OP_READ);
        } catch (Exception e) {
            e.printStackTrace();
            if (channel != null) {
                channel.close();
            }
        }
    }

二、NameNode宕机恢复

NameServer接收到fsimage后,就可以在启动时基于fsimage快照完成元数据的恢复了。这个工作由FSNameSystem组件来完成,整个流程分为三步:

  1. 加载fsimage:从本地磁盘查找fsimage文件,恢复内存文件目录树;
  2. 加载checkpoint信息:从本地磁盘查找checkpoint文件,恢复checkpoint信息;
  3. 回放edits log日志:读取checkpoint之后的edits log日志,进行回放。
    /**
     * 负责管理元数据的核心组件
     */
    public class FSNameSystem {
    
        // 负责管理内存文件目录树的组件
        private FSDirectory directory;
    
        // 负责管理edits log写入磁盘的组件
        private FSEditlog editlog;
    
        // 最近一次checkpoint更新到的txid
        private long checkpointTxid = 0L;
    
        public FSNameSystem() {
            this.directory = new FSDirectory();
            this.editlog = new FSEditlog();
            // 加载fsimage恢复元数据
            recoverNamespace();
        }
    
        /**
         * 基于fsimage快照和edits log恢复元数据
         */
        public void recoverNamespace() {
            try {
                // 1.加载fsimage文件
                loadFSImage();
                // 2.加载chekpoint信息
                loadCheckpointTxid();
                // 3.回放edits log
                loadEditLog();
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        //...
    }

2.1 加载fsimage文件

我们先来看加载fsimage文件的流程,就是基于Java NIO读取磁盘上的fsimage快照文件,然后反序列化生成内存文件目录树:

    public class FSNameSystem {
    
        // 负责管理内存文件目录树的组件
        private FSDirectory directory;
    
        /**
         * 基于fsimage快照生成内存文件目录树
         */
        private void loadFSImage() throws Exception {
            FileInputStream in = null;
            FileChannel channel = null;
            try {
                // 这里可以基于配置文件读取
                String path = "C:\\Users\\Ressmix\\Desktop\\editslog\\fsimage.meta";
                File file = new File(path);
                if (!file.exists()) {
                    System.out.println("fsimage文件当前不存在,不进行恢复.......");
                    return;
                }
    
                in = new FileInputStream(path);
                channel = in.getChannel();
                ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
                int count = channel.read(buffer);
    
                buffer.flip();
                String fsimageJson = new String(buffer.array(), 0, count);
                System.out.println("恢复fsimage文件中的数据:" + fsimageJson);
    
                FSDirectory.INode dirTree = JSONObject.parseObject(fsimageJson, FSDirectory.INode.class);
                // 生成目录树
                directory.setDirTree(dirTree);
            } finally {
                if (in != null) {
                    in.close();
                }
                if (channel != null) {
                    channel.close();
                }
            }
        }
        //...
    }

2.2 加载checkpoint信息

我们再来看加载checkpoint信息的流程,我在上一章讲过,BackupNode会通过RPC将checkpoint信息发送给NameNode。NameNode先在内存中维护checkpoint的txid信息,当停机时会将checkpoint信息持久化到磁盘上。

所以,加载checkpoint信息的流程就是读取磁盘上的checkpoint文件,然后恢复内存中的checkpoint的txid信息:

    /**
     * 负责管理元数据的核心组件
     */
    public class FSNameSystem {
    
        // 最近一次checkpoint对应的txid
        private long checkpointTxid = 0L;
    
        /**
         * 加载checkpoint txid
         */
        private void loadCheckpointTxid() throws Exception {
            FileInputStream in = null;
            FileChannel channel = null;
            try {
                String path = "C:\\Users\\Ressmix\\Desktop\\editslog\\checkpoint-txid.meta";
                File file = new File(path);
                if (!file.exists()) {
                    System.out.println("checkpoint txid文件不存在,不进行恢复.......");
                    return;
                }
    
                in = new FileInputStream(path);
                channel = in.getChannel();
    
                ByteBuffer buffer = ByteBuffer.allocate(1024);
                int count = channel.read(buffer);
    
                buffer.flip();
                long checkpointTxid = Long.valueOf(new String(buffer.array(), 0, count));
                System.out.println("恢复checkpoint txid:" + checkpointTxid);
                // 恢复内存中的txid,也就是最近一次执行checkpoint时,保存的最新edits log的txid
                this.checkpointTxid = checkpointTxid;
            } finally {
                if (in != null) {
                    in.close();
                }
                if (channel != null) {
                    channel.close();
                }
            }
        }
        //...
    }

2.3 回放edits log日志

最后,NameNode还需要进行edits log日志的回放,因为fsimage中只有checkpoint时间点之前的edits log,也就是小于等于checkpointTxid的日志。所以,NameNode还需要读取磁盘中checkpointTxid之后的edits log进行回放:

    /**
     * 负责管理元数据的核心组件
     */
    public class FSNameSystem {
    
        // 负责管理内存文件目录树的组件
        private FSDirectory directory;
    
        // 负责管理edits log写入磁盘的组件
        private FSEditlog editlog;
    
        // 最近一次checkpoint对应的txid
        private long checkpointTxid = 0L;
    
        /**
         * 加载和回放editlog
         */
        private void loadEditLog() throws Exception {
            // 这里可以基于配置文件读取
            File dir = new File("C:\\Users\\Ressmix\\Desktop\\editslog\\");
            if (!dir.exists() || dir.listFiles() == null) {
                return;
            }
    
            List<File> files = new ArrayList<>();
            for (File file : dir.listFiles()) {
                if (file.getName().contains("edits")) {
                    files.add(file);
                }
            }
    
            // 按edits log的文件名从小到大排序
            Collections.sort(files, new Comparator<File>() {
                @Override
                public int compare(File o1, File o2) {
                    Integer o1StartTxid = Integer.valueOf(o1.getName().split("-")[1]);
                    Integer o2StartTxid = Integer.valueOf(o2.getName().split("-")[1]);
                    return o1StartTxid - o2StartTxid;
                }
            });
    
            if (files == null || files.size() == 0) {
                System.out.println("当前没有任何editlog文件,不进行恢复......");
                return;
            }
    
            for (File file : files) {
                if (file.getName().contains("edits")) {
                    System.out.println("准备恢复editlog文件中的数据:" + file.getName());
    
                    String[] splitedName = file.getName().split("-");
                    long startTxid = Long.valueOf(splitedName[1]);
                    long endTxid = Long.valueOf(splitedName[2].split("[.]")[0]);
    
                    // 如果是checkpointTxid之后的那些editlog都要加载出来
                    if (endTxid > checkpointTxid) {
                        String currentEditsLogFile = "C:\\Users\\Ressmix\\Desktop\\editslog\\edits-"
                                + startTxid + "-" + endTxid + ".log";
                        List<String> editsLogs = Files.readAllLines(Paths.get(currentEditsLogFile),
                                StandardCharsets.UTF_8);
    
                        for (String editLogJson : editsLogs) {
                            JSONObject editLog = JSONObject.parseObject(editLogJson);
                            long txid = editLog.getLongValue("txid");
    
                            if (txid > checkpointTxid) {
                                System.out.println("准备回放editlog:" + editLogJson);
                                // 回放到内存里去
                                String op = editLog.getString("OP");
                                if (op.equals("MKDIR")) {
                                    String path = editLog.getString("PATH");
                                    directory.mkdir(path);
                                }
                            }
                        }
                    }
                }
            }
        }
        //...
    }

三、Edits log清理

最后,我这里再补充一下NameNode的Edits log清理机制。我们思考一下,既然NameNode已经接收到了fsimage快照,那么checkpoint之前的那些edits log是不是应该都清理掉已节省磁盘空间呢?

所以,我们需要有一个后台线程 EditLogCleaner 定期做Edits log的清理工作:

    public class FSNameSystem {
    
        // 负责清理Edits log的组件
        private EditLogCleaner editLogCleaner;
    
        public FSNameSystem() {
            this.directory = new FSDirectory();
            this.editlog = new FSEditlog();
            // 启动一个清理线程
            this.editLogCleaner = new EditLogCleaner(this);
            editLogCleaner.start();
            recoverNamespace();
        }
        //...
    }

3.1 EditLogCleaner

我们来看下EditLogCleaner的具体实现,思路非常简单,就是根据FSNameSystem保存的checkpoint信息,去磁盘文件上查找那些checkpoint之前的edits log,然后删除掉:

    /**
     * Edits Log清理线程
     */
    public class EditLogCleaner extends Thread {
        // 默认每隔30min清理一次
        private static final Long EDIT_LOG_CLEAN_INTERVAL = 30 * 1000L;
    
        private FSNameSystem nameSystem;
    
        public EditLogCleaner(FSNameSystem nameSystem) {
            super();
            this.nameSystem = nameSystem;
        }
    
        @Override
        public void run() {
            System.out.println("edits log日志文件后台清理线程启动......");
    
            while (true) {
                try {
                    Thread.sleep(EDIT_LOG_CLEAN_INTERVAL);
    
                    List<String> flushedTxids = nameSystem.getEditsLog().getFlushedTxids();
                    if (flushedTxids != null && flushedTxids.size() > 0) {
                        // 获取最近一次checkpoint
                        long checkpointTxid = nameSystem.getCheckpointTxid();
                        for (String flushedTxid : flushedTxids) {
                            long startTxid = Long.valueOf(flushedTxid.split("_")[0]);
                            long endTxid = Long.valueOf(flushedTxid.split("_")[1]);
    
                            // 在最近一次checkpoint之前的edits log文件都要删除
                            if (checkpointTxid >= endTxid) {
                                File file = new File("C:\\Users\\Ressmix\\Desktop\\editslog\\edits-"
                                        + startTxid + "-" + endTxid + ".log");
                                if (file.exists()) {
                                    file.delete();
                                    System.out.println("发现editlog日志文件不需要,进行删除:" + file.getPath());
                                }
                            }
                        }
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

四、总结

本章,我对fsimage传输与NameNode的宕机恢复元数据的机制进行了讲解。fsimage快照传输我使用了Java NIO的通信机制,NIO代码本身没什么难的,主要是一些Boilerplate Code,实际工作中开发得多了也就熟悉了。

阅读全文
  • 点赞