本章,我将对BackupNode的启停与元数据恢复机制进行讲解,主要分为两部分内容:
- BackupNode的元数据恢复机制;
- BackupNode的优雅停机机制。
一、元数据恢复
BackupNode会定期生成fsimage快照和checkpoint信息保存到磁盘上,同时定期从NameNode拉取Edits Log日志,如果BackupNode突然宕机,内存中的元数据就会丢失。
所以,BackupNode在启动时需要查找磁盘中的fsimage快照文件和checkpoint文件,恢复内存文件目录树,从checkpoint之后开始拉取edits log日志。
元数据由FSNameSystem管理,所以需要在FSNameSystem构造时进行元数据恢复:
public class FSNameSystem {
// 负责管理内存文件目录树的组件
private FSDirectory directory;
// Checkpoint检查点
private CheckPoint checkPoint = new CheckPoint();
// 是否正在从fsimage恢复元数据
private volatile boolean finishedRecover = false;
/**
* 恢复元数据
*/
public void recoverNamespace() {
try {
// 1.加载checkpoint文件
loadCheckpointInfo();
// 2.加载fsimage文件
loadFSImage();
// 3.置恢复元数据标志
finishedRecover = true;
} catch (Exception e) {
e.printStackTrace();
}
}
//...
}
来看FSNameSystem.recoverNamespace()
方法,主要分为三个步骤:
- 加载checkpoint文件,解析checkpoint信息并保存到内存中;
- 加载fsimage文件,并结合checkpoint信息恢复内存文件目录树;
- 置恢复元数据标志为完成。
1.1 加载checkpoint文件
先来看checkpoint文件的读取,checkpoint文件中包含了最近一次执行checkpoint的时间、最大txid、fsimage文件名,FSNameSystem读取到checkpoint文件后,会将其缓存到内存中:
/**
* 负责管理元数据的核心组件
*/
public class FSNameSystem {
// Checkpoint检查点
private CheckPoint checkPoint = new CheckPoint();
/**
* 加载checkpoint文件
*/
private void loadCheckpointInfo() throws Exception {
FileInputStream in = null;
FileChannel channel = null;
try {
String path = "C:\\Users\\Ressmix\\Desktop\\backupnode\\checkpoint-info.meta";
File file = new File(path);
if (!file.exists()) {
System.out.println("checkpoint info文件不存在,不进行恢复.......");
return;
}
in = new FileInputStream(path);
channel = in.getChannel();
ByteBuffer buffer = ByteBuffer.allocate(1024);
int count = channel.read(buffer);
buffer.flip();
// 解析checkpoint
String checkpointInfo = new String(buffer.array(), 0, count);
long checkpointTime = Long.valueOf(checkpointInfo.split("_")[0]);
long syncedTxid = Long.valueOf(checkpointInfo.split("_")[1]);
String fsimageFile = checkpointInfo.split("_")[2];
System.out.println("恢复checkpoint time:" + checkpointTime + ", synced txid: " + syncedTxid + ", fsimage file: " + fsimageFile);
this.checkPoint.setCheckpointTime(checkpointTime);
this.checkPoint.setFsimageFile(fsimageFile);
this.checkPoint.setSyncedTxid(syncedTxid);
directory.setMaxTxid(syncedTxid);
} finally {
if (in != null) {
in.close();
}
if (channel != null) {
channel.close();
}
}
}
//...
}
1.2 加载fsimage文件
接着,FSNameSystem会读取fsimage文件,恢复内存文件目录树:
/**
* 负责管理元数据的核心组件
*/
public class FSNameSystem {
// 负责管理内存文件目录树的组件
private FSDirectory directory;
// Checkpoint检查点
private CheckPoint checkPoint = new CheckPoint();
// 是否正在从fsimage恢复元数据
private volatile boolean finishedRecover = false;
/**
* 加载fsimage文件到内存里来进行恢复
*/
private void loadFSImage() throws Exception {
FileInputStream in = null;
FileChannel channel = null;
try {
// 从checkpoint获取快照文件名
String fsimage = checkPoint.getFsimageFile();
String path = "C:\\Users\\Ressmix\\Desktop\\backupnode\\" + fsimage;
File file = new File(path);
if (!file.exists()) {
System.out.println("fsimage文件当前不存在,不进行恢复.......");
return;
}
in = new FileInputStream(path);
channel = in.getChannel();
ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
int count = channel.read(buffer);
buffer.flip();
String fsimageJson = new String(buffer.array(), 0, count);
System.out.println("恢复fsimage文件中的数据:" + fsimageJson);
FSDirectory.INode dirTree = JSONObject.parseObject(fsimageJson, new TypeReference<FSDirectory.INode>() {
});
System.out.println(dirTree);
// 恢复目录树
directory.setDirTree(dirTree);
} finally {
if (in != null) {
in.close();
}
if (channel != null) {
channel.close();
}
}
}
//...
}
1.3 置恢复元数据标志
最后,置恢复元数据标志:
public class FSNameSystem {
// 是否正在从fsimage恢复元数据
private volatile boolean finishedRecover = false;
/**
* 恢复元数据
*/
public void recoverNamespace() {
try {
loadCheckpointInfo();
loadFSImage();
// 置完成
finishedRecover = true;
} catch (Exception e) {
e.printStackTrace();
}
}
}
这样,当定期拉取Edits Log的线程和定期生成fsimage快照的线程在运行前就必须要先判断:如果BackipNode正在进行元数据恢复,则等待其完成。
/**
* edits log复制组件
*/
public class EditsLogFetcher extends Thread {
private FSNameSystem namesystem;
@Override
public void run() {
System.out.println("edits log同步线程已经启动......");
while (backupNode.isRunning()) {
try {
// 1.如果BackipNode正在进行元数据恢复,则等待其完成
if (!namesystem.isFinishedRecover()) {
System.out.println("当前还没完成元数据恢复,不进行editlog同步......");
Thread.sleep(1000);
continue;
}
//...
}
}
}
}
二、优雅停机
BackupNode在运行时,后台有两个线程:
public class BackupNode {
//...
public void start() {
// 定期拉取Edits Log线程
EditsLogFetcher editsLogFetcher = new EditsLogFetcher(this, namesystem, namenode);
editsLogFetcher.start();
// 定期生成fsimage快照线程
FSImageCheckPointer checkPointer = new FSImageCheckPointer(this, namesystem, namenode);
checkPointer.start();
}
}
我们需要分别对它们做一些特殊处理,防止BackupNode进程结束时它俩还在运行。
2.1 拉取Edits Log线程
EditsLogFetcher线程在运行时,需要基于BackupNode的isRunning
字段判断BackupNode的状态,
/**
* edits log复制组件
*/
public class EditsLogFetcher extends Thread {
private static final Integer BACKUP_NODE_FETCH_SIZE = 10;
private BackupNode backupNode;
private NameNodeRpcClient namenode;
private FSNameSystem namesystem;
public EditsLogFetcher(BackupNode backupNode, FSNameSystem namesystem, NameNodeRpcClient namenode) {
this.backupNode = backupNode;
this.namenode = namenode;
this.namesystem = namesystem;
}
@Override
public void run() {
System.out.println("edits log同步线程已经启动......");
while (backupNode.isRunning()) {
try {
// 1.如果BackipNode正在进行元数据恢复,则等待其完成
if (!namesystem.isFinishedRecover()) {
System.out.println("当前还没完成元数据恢复,不进行editlog同步......");
Thread.sleep(1000);
continue;
}
// 2.从上一次同步完成的txid开始进行日志拉取
long syncedTxid = namesystem.getSyncedTxid();
JSONArray editsLogs = namenode.fetchEditsLog(syncedTxid);
if (editsLogs.size() == 0) {
System.out.println("没有拉取到任何一条editslog,等待1秒后继续尝试拉取");
Thread.sleep(1000);
continue;
}
if (editsLogs.size() < BACKUP_NODE_FETCH_SIZE) {
Thread.sleep(1000);
System.out.println("拉取到的edits log不足10条数据,等待1秒后再次继续去拉取");
}
// 3.进行日志回放
for (int i = 0; i < editsLogs.size(); i++) {
JSONObject editsLog = editsLogs.getJSONObject(i);
System.out.println("拉取到一条editslog:" + editsLog.toJSONString());
String op = editsLog.getString("OP");
if (op.equals("MKDIR")) {
String path = editsLog.getString("PATH");
try {
namesystem.mkdir(editsLog.getLongValue("txid"), path);
} catch (Exception e) {
e.printStackTrace();
}
}
}
namenode.setRunning(true);
} catch (Exception e) {
namenode.setRunning(false);
}
}
}
}
注意,上述有一行代码:namenode.setRunning(true);
,目的是对NameNode的状态作检测,防止NameNode出现问题时,BackupNode的checkpoint机制还在不断向NameNode发送fsimage快照。
2.2 生成fsimage线程
生成fsimage线程的代码如下:
/**
* 生成fsimage快照文件的线程
*/
public class FSImageCheckPointer extends Thread {
private static final Integer CHECKPOINT_INTERVAL = 60 * 60 * 1000;
private BackupNode backupNode;
private FSNameSystem namesystem;
private NameNodeRpcClient namenode;
public FSImageCheckPointer(BackupNode backupNode, FSNameSystem namesystem, NameNodeRpcClient namenode) {
this.backupNode = backupNode;
this.namesystem = namesystem;
this.namenode = namenode;
}
@Override
public void run() {
System.out.println("fsimage checkpoint定时调度线程启动......");
while (backupNode.isRunning()) {
try {
// 1.如果BackupNode正在进行元数据恢复,则等待其完成
if (!namesystem.isFinishedRecover()) {
System.out.println("当前还没完成元数据恢复,不进行checkpoint......");
Thread.sleep(1000);
continue;
}
// 2.判断是否可以进行checkpoint
long now = System.currentTimeMillis();
long checkpointTime = namesystem.getCheckPoint().getCheckpointTime();
if (now - checkpointTime > CHECKPOINT_INTERVAL) {
// 如果NameNode状态不正常,则不触发checkpoint机制
if (!namenode.getRunning()) {
System.out.println("namenode当前无法访问,不执行checkpoint......");
continue;
}
System.out.println("准备执行checkpoint操作......");
// 触发checkpoint
doCheckpoint();
System.out.println("完成checkpoint操作......");
}
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
//...
}
三、总结
本章,我对BackupNode宕机恢复机制进行了讲解,实现思路与NameNode的元数据恢复机制相似。至此,我们的分布式文件系统的元数据管理功能就基本全部实现了,主要包括了以下内容:
- 基于纯内存的元数据信息维护;
- editslog机制;
- backup备份机制;
- 元数据恢复机制。