BackupNode从NameNode复制了Edits Log后,会生成内存文件目录树,并且会定期生成一个fsimage文件快照。所谓fsimage文件快照,本质就是一个基于元数据的持久化文件:
BackupNode每触发一次checkpoint机制,都会执行以下操作:
- 生成一个fsimage快照并持久化保存;
- 将fsimage文件发送给NameNode;
- 将本次checkpoint的信息发送给NameNode;
- 本地持久化保存checkpoint信息。
BackupNode会把checkpoint信息通过RPC方式发送给NameNode,NameNode记录最新的checkpoint信息后,就知道万一自己宕机了该读取哪个fsimage文件来恢复目录树。
本章,我就来实现分布式文件系统的checkpoint机制代码。
一、文件格式
首先我们来考虑下fsimage快照文件的格式,以及checkpoint文件的格式。
1.1 fsimage文件
fsimage文件的格式需要根据实际的业务情况来设计,比如Hadoop这种分布式存储系统,fsimage的设计是非常复杂的。但是,我们的分布式文件系统可以设计得简单一些,直接用一个JSON保存就可以了,也就是说将内存文件目录树的内容直接保存成JSON格式的文件。
我们定义一个Java Bean,来表示快照文件。注意,下面的maxTxid
表示当前快照文件中的最新一条edits log对应的txid:
public class FSImage {
private long maxTxid;
private String fsimageJson;
public FSImage(long maxTxid, String fsimageJson) {
this.maxTxid = maxTxid;
this.fsimageJson = fsimageJson;
}
//...省略get/set
}
很多童鞋会想,如果内存文件目录树中的内容特别多,会不会导致fsimage文件过大呢?事实上,我们可以自己估算一下内存文件目录树的大小,对于一个包含1亿个文件元数据信息的内存文件目录树,其大小也就是一两个GB。
NameNode重启后,从fsimage文件加载目录树到内存中,一般32G内存的机器就已经足够了。
如果文件确实很多,可以采用Hadoop HDFS提供的一个方案——Federation架构,其实就是把文件目录树给拆散,按照数据分片的方式存储到多个NameNode上,每个NameNode里存放一部分的目录和文件,保证每个NameNode内存里就几个GB的文件目录树。
1.2 checkpoint文件
checkpoint文件比较简单,checkpoint文件保存了BackupNode最近一次执行checkpoint机制的一些信息,包括:checkpoint时间点,fsimage中的最新edits log的txid,fsimage文件名等等。
public class CheckPoint {
// 最近一次checkpoint时间点
private long checkpointTime = System.currentTimeMillis();
// 最近一次checkpoint对应的目录树的最大txid
private long syncedTxid = -1L;
// 最近一次checkpoint中fsimage文件名
private String fsimageFile = "";
//...省略get/set
}
我们直接用一个名为
checkpoint-info.meta
的文件来保存CheckPoint对象的JSON内容即可。
二、checkpoint线程
DataNode启动后,会启动一个后台线程 FSImageCheckPointer ,它需要定时触发checkpoint机制。我们设计成默认每隔1小时执行一次checkpoint机制,并且只保存最近一小时的edits log文件:
/**
* BackupNode启动类
*/
public class BackupNode {
private volatile Boolean isRunning = true;
private FSNameSystem namesystem;
public static void main(String[] args) {
BackupNode backupNode = new BackupNode();
backupNode.init();
backupNode.start();
}
public void start() {
//...
// 定期执行checkpoint的线程
FSImageCheckPointer checkPointer = new FSImageCheckPointer(this, namesystem, namenode);
checkPointer.start();
}
public Boolean isRunning() {
return isRunning;
}
}
我们先来看下checkpoint线程,它默认每隔60分钟会执行一次checkpoint机制:
public class FSImageCheckPointer extends Thread {
private static final Integer CHECKPOINT_INTERVAL = 60 * 60 * 1000;
private BackupNode backupNode;
private FSNameSystem namesystem;
private NameNodeRpcClient namenode;
public FSImageCheckPointer(BackupNode backupNode, FSNameSystem namesystem,
NameNodeRpcClient namenode) {
this.backupNode = backupNode;
this.namesystem = namesystem;
this.namenode = namenode;
}
@Override
public void run() {
System.out.println("fsimage checkpoint定时调度线程启动......");
while (backupNode.isRunning()) {
try {
// 1.如果BackupNode正在进行元数据恢复,则等待其完成
if (!namesystem.isFinishedRecover()) {
System.out.println("当前还没完成元数据恢复,不进行checkpoint......");
Thread.sleep(1000);
continue;
}
// 2.判断是否可以进行checkpoint
long now = System.currentTimeMillis();
long checkpointTime = namesystem.getCheckPoint().getCheckpointTime();
if (now - checkpointTime > CHECKPOINT_INTERVAL) {
if (!namenode.getRunning()) {
System.out.println("namenode当前无法访问,不执行checkpoint......");
continue;
}
System.out.println("准备执行checkpoint操作......");
// 触发checkpoint
doCheckpoint();
System.out.println("完成checkpoint操作......");
}
Thread.sleep(1000);
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 执行checkpoint机制
*/
private void doCheckpoint() throws Exception {
// 1.生成fsimage
FSImage fsimage = namesystem.getFSImage();
// 2.删除上一次的fsimage文件
removeLastFSImageFile();
// 3.保存fsimage文件到磁盘
writeFSImageFile(fsimage);
// 4.将fsimage文件发送给NameNode
uploadFSImageFile(fsimage);
// 5.将checkpoint信息发送给NameNode
updateCheckpointTxid(fsimage);
// 6.持久化checkpoint信息
saveCheckpointInfo(fsimage);
}
//...
}
FSNameSystem保存了最近的一次checkpoint的信息,在BackupNode启动时会执行元数据恢复,这些我后续章节再讲解。我们重点来看doCheckpoint
中的六大步骤:
2.1 生成fsimage
FSNameSystem 组件会基于内存文件目录树生成FSImage文件的内容,本质就是一个大的JSON串:
/**
* 负责管理元数据的核心组件
*
* @author Ressmix
*/
public class FSNameSystem {
// 负责管理内存文件目录树的组件
private FSDirectory directory;
// Checkpoint检查点
private CheckPoint checkPoint = new CheckPoint();
// 是否正在从fsimage恢复元数据
private volatile boolean finishedRecover = false;
public FSNameSystem() {
this.directory = new FSDirectory();
// 从fsimage恢复元数据
recoverNamespace();
}
/**
* 获取文件目录树的json
*/
public FSImage getFSImage() {
return directory.getFSImage();
}
//...
}
来看FSDirectory.getFSImage()
的实现,需要注意的是FSDirectory利用了一个 读写锁 来控制并发访问,maxTxid
用于记录最新的edits log的日志ID:
public class FSDirectory {
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private long maxTxid = 0;
public void mkdir(Long txid, String path) {
try {
writeLock();
maxTxid = txid;
//...
} finally {
writeUnlock();
}
}
/**
* 以json格式获取到fsimage内存元数据
*/
public FSImage getFSImage() {
FSImage fsimage = null;
try {
readLock();
String fsimageJson = JSONObject.toJSONString(dirTree);
long lastedTxid = this.maxTxid;
fsimage = new FSImage(lastedTxid, fsimageJson);
} finally {
readUnlock();
}
return fsimage;
}
}
2.2 清理fsimage
checkpoint线程需要清理上一次执行checkpoint生成的fsimage文件:
// FSImageCheckPointer.java
/**
* 删除上一个fsimage磁盘文件
*/
private void removeLastFSImageFile() {
String lastFsimageFile = namesystem.getCheckPoint().getFsimageFile();
File file = new File(lastFsimageFile);
if (file.exists()) {
file.delete();
}
}
2.3 持久化fsimage
checkpoint线程需要将最新fsimage内容保存到磁盘上,然后更新本次checkpoint的信息:
// FSImageCheckPointer.java
/**
* 写入最新的fsimage文件
*/
private void writeFSImageFile(FSImage fsimage) throws Exception {
ByteBuffer buffer = ByteBuffer.wrap(fsimage.getFsimageJson().getBytes());
// fsimage文件名
String filename = "fsimage-" + fsimage.getMaxTxid() + ".meta";
String fsimageFilePath = "C:\\Users\\Ressmix\\Desktop\\backupnode\\" + filename;
RandomAccessFile file = null;
FileOutputStream out = null;
FileChannel channel = null;
try {
file = new RandomAccessFile(fsimageFilePath, "rw");
out = new FileOutputStream(file.getFD());
channel = out.getChannel();
channel.write(buffer);
channel.force(false);
} finally {
if (out != null) {
out.close();
}
if (file != null) {
file.close();
}
if (channel != null) {
channel.close();
}
}
// 更新checkpoint信息
namesystem.getCheckPoint().setFsimageFile(filename);
namesystem.getCheckPoint().setCheckpointTime(System.currentTimeMillis());
namesystem.getCheckPoint().setSyncedTxid(fsimage.getMaxTxid());
}
2.4 传输fsimage
接着,BackupNode需要将fsimage文件发送给NameNode:
// FSImageCheckPointer.java
/**
* 发送fsimage文件
*/
private void uploadFSImageFile(FSImage fsimage) throws Exception {
FSImageUploader fsimageUploader = new FSImageUploader(fsimage);
fsimageUploader.start();
}
传输fsimage在实现上相对复杂一些,我完全基于Java NIO来实现,并且和NameNode的宕机恢复相关,所以我放到下一章《fsimage传输与宕机恢复》专门讲解。
2.5 发送checkpoint信息
随后,BackupNode还需要将本次checkpoint信息以RPC方式发送给NameNode,这样NameNode接收到fsimage文件后,才知道选择哪个fsimage文件进行恢复:
// FSImageCheckPointer.java
/**
* 发送checkpoint信息
*/
private void updateCheckpointTxid(FSImage fsimage) {
namenode.updateCheckpointTxid(fsimage.getMaxTxid());
}
我们来看下BackupNode作为RPC Client的实现,其实就是将checkpoint中的txid发送了出去:
public class NameNodeRpcClient {
private static final String NAMENODE_HOSTNAME = "localhost";
private static final Integer NAMENODE_PORT = 50070;
private volatile Boolean isRunning = true;
private NameNodeServiceGrpc.NameNodeServiceBlockingStub namenode;
public NameNodeRpcClient() {
ManagedChannel channel = NettyChannelBuilder.forAddress(NAMENODE_HOSTNAME, NAMENODE_PORT)
.negotiationType(NegotiationType.PLAINTEXT).build();
this.namenode = NameNodeServiceGrpc.newBlockingStub(channel);
}
/**
* 发送checkpoint信息
*/
public void updateCheckpointTxid(long txid) {
UpdateCheckpointTxidRequest request = UpdateCheckpointTxidRequest.newBuilder()
.setTxid(txid).build();
namenode.updateCheckpointTxid(request);
}
//...
}
再来看下NameNode作为RPC Server的实现:
public class NameNodeServiceImpl extends NameNodeServiceGrpc.NameNodeServiceImplBase {
public static final Integer STATUS_SUCCESS = 1;
public static final Integer STATUS_FAILURE = 2;
public static final Integer STATUS_SHUTDOWN = 3;
// 负责管理元数据的核心组件
private FSNameSystem namesystem;
public NameNodeServiceImpl(FSNameSystem namesystem, DataNodeManager datanodeManager,
EditLogReplicator replicator) {
this.namesystem = namesystem;
this.datanodeManager = datanodeManager;
this.replicator = replicator;
}
/**
* 更新checkpoint信息
*/
@Override
public void updateCheckpointTxid(UpdateCheckpointTxidRequest request,
StreamObserver<UpdateCheckpointTxidResponse> responseObserver) {
long txid = request.getTxid();
// 保存checkpoint中的txid信息
namesystem.setCheckpointTxid(txid);
UpdateCheckpointTxidResponse response = UpdateCheckpointTxidResponse.newBuilder()
.setStatus(1).build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
/**
* 优雅停机
*/
@Override
public void shutdown(ShutdownRequest request, StreamObserver<ShutdownResponse> responseObserver) {
isRunning = false;
namesystem.flush();
namesystem.saveCheckpointTxid();
System.out.println("优雅关闭namenode......");
}
//...
}
NameNode接受到RPC请求后,首先在内存中更新checkpoint信息,当NameNode停机时,会将checkpoint信息持久化到磁盘上。这样,当NameNode宕机恢复启动时,就可以读取checkpoint信息和fsimage文件恢复元数据:
/**
* 负责管理元数据的核心组件
*/
public class FSNameSystem {
// 最近一次checkpoint更新到的txid
private long checkpointTxid = 0L;
public FSNameSystem() {
this.directory = new FSDirectory();
this.editlog = new FSEditlog();
// 基于fsimage宕机恢复元数据
recoverNamespace();
}
public void setCheckpointTxid(long txid) {
System.out.println("接收到checkpoint txid:" + txid);
this.checkpointTxid = txid;
}
public long getCheckpointTxid() {
return checkpointTxid;
}
/**
* 将checkpoint txid保存到磁盘上去
*/
public void saveCheckpointTxid() {
String path = "C:\\Users\\Ressmix\\Desktop\\editslog\\checkpoint-txid.meta";
RandomAccessFile raf = null;
FileOutputStream out = null;
FileChannel channel = null;
try {
File file = new File(path);
if(file.exists()) {
file.delete();
}
ByteBuffer buffer = ByteBuffer.wrap(String.valueOf(checkpointTxid).getBytes());
raf = new RandomAccessFile(path, "rw");
out = new FileOutputStream(raf.getFD());
channel = out.getChannel();
channel.write(buffer);
channel.force(false);
} catch(Exception e) {
e.printStackTrace();
} finally {
try {
if(out != null) {
out.close();
}
if(raf != null) {
raf.close();
}
if(channel != null) {
channel.close();
}
} catch (Exception e2) {
e2.printStackTrace();
}
}
}
//...
}
2.6 持久化checkpoint信息
BackupNode将checkpoint信息发送给NameNode后,最后一步是持久化保存本次checkpoint信息:
// FSImageCheckPointer.java
/**
* 持久化checkpoint信息
*/
private void saveCheckpointInfo(FSImage fsimage) {
String path = "C:\\Users\\Ressmix\\Desktop\\backupnode\\checkpoint-info.meta";
RandomAccessFile raf = null;
FileOutputStream out = null;
FileChannel channel = null;
try {
File file = new File(path);
if (file.exists()) {
file.delete();
}
long time = namesystem.getCheckPoint().getCheckpointTime();
long checkpointTxid = namesystem.getCheckPoint().getSyncedTxid();
String lastFsimageFile = namesystem.getCheckPoint().getFsimageFile();
ByteBuffer buffer = ByteBuffer.wrap(String.valueOf(time + "_" + checkpointTxid + "_" + lastFsimageFile).getBytes());
raf = new RandomAccessFile(path, "rw");
out = new FileOutputStream(raf.getFD());
channel = out.getChannel();
channel.write(buffer);
channel.force(false);
System.out.println("checkpoint信息持久化到磁盘文件......");
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
if (out != null) {
out.close();
}
if (raf != null) {
raf.close();
}
if (channel != null) {
channel.close();
}
} catch (Exception e2) {
e2.printStackTrace();
}
}
}
三、总结
本章,我BackupNode的checkpoint机制进行了深入详尽的讲解。BackupNode主要依赖一个后台线程定期执行checkpoint机制。下一章,我将带大家实现遗留的fsimage文件传输与namenode的宕机恢复机制。