本章开始,我将对分布式文件系统的高可用架构进行讲解。我们的分布式文件系统,主要考虑两个地方的高可用:
- NameNode高可用 :NameNode负责管理整个分布式文件系统的元数据,我们使用BackupNode作为它的冷备节点。当NameNode故障时,虽然可以通过edits log和fsimage快照机制保证元数据不会丢失,但是无法实现故障的自动转移,需要手动进行故障节点切换;
- DataNode高可用 :DataNode负责文件的实际存储,我们采用了双副本机制来保障DataNode的高可用,但是如果一个DataNode节点挂掉,可能会导致一批文件只有1个副本。另外,如果DataNode在文件传输过程中出现故障,也可能导致数据丢失。
高可用相关代码,存放在:https://gitee.com/ressmix/source-code/tree/master/5.dfs/3.HA。
一、概述
1.1 NameNode高可用
对于NameNode高可用,我已经采用冷备BackupNode节点实现了,如果需要进行故障转移,我们一般需要执行以下步骤:
- 从BackupNode获取一份最新的fsimage,放到NameNode机器所在的指定目录;
- 启动NameNode进程,从fsimage加载元数据。
显然,上述这种方式十分傻瓜,只能说在一些中小公司内部使用。要真正做到NameNode的故障自动转移,我们一般需要引入Zookeeper这类分布式协调中间件,当NameNode启动后,自动将节点信息注册到Zookeeper中,其它的BackupNode和DataNode等统一从Zookeeper感知NameNode节点的变化。
本系列,我就不带大家去实现NameNode的故障自动转移了,因为涉及分布式注册中心的很多概念和底层原理的讲解。后续,我会开一个《透彻理解分布式注册中心》专栏,对Zookeeper的底层原理进行讲解,并自己动手实现一个注册中心。
1.2 DataNode高可用
DataNode高可用,在我们的 双副本 架构下,主要需要考虑以下三点:
- 如果DFS客户端在传输文件过程中,DataNode挂了,客户端该如何处理?
- DataNode宕机时,NameNode如何将它上面的文件分配给其它正常的DataNode节点?
- DataNode恢复后,NameNode如何让该节点删除上面的副本?
第一个问题属于 中断异常处理 ,通常处理方式是,客户端重新向NameNode请求获取一个可用的DataNode,然后再次进行传输,我将在下一章讲解具体的处理流程。
第二、三个问题的本质是 文件副本重分配 ,本章我就对该问题的处理进行讲解。
二、文件副本重分配
我们先来看下副本重分配的整体处理流程:
- 首先,NameNode需要在内存中记录每个DataNode包含哪些文件,这一过程可以通过增量/全量信息上报完成;
- 接着,NameNode在检测DataNode的心跳时,如果发现某个DataNode失活了,就需要删除内存中该节点的元数据,同时对该节点上的每个文件都创建一个副本分配任务,任务包含源DataNode、目标DataNode、文件元数据等信息;
- 然后,NameNode将副本分配任务扔到内存队列中,当各个DataNode发送心跳时,会获取目标节点为自身的任务,然后执行文件复制,复制完成后进行一次增量信息上报;
- 最后,当那个宕机的DataNode恢复后,会向NameNode进行注册并全量上报自身信息,此时NameNode发现它的某个文件已经存在于其它的两个副本中了,就要求该DataNode删除自身的副本文件。
接着,我们来实现上述重分配的代码。
2.1 创建重分配任务
首先,我们需要改造下NameNode的心跳检测线程——DataNodeAliveMonitor。当DataNodeAliveMonitor检测有DataNode失活后,需要进行以下处理:
-
遍历该DataNode上的所有文件,为每个文件创建一个副本复制任务;
- 首先,确认执行复制任务的源数据节点;
- 接着,确认执行复制任务的目标数据节点;
- 最后,将副本复制任务扔到目标数据节点内部的一个队列中缓存。
-
从DataNode集群元数据中删除该失活DataNode节点。
// DataNodeAliveMonitor.java
private class DataNodeAliveMonitor extends Thread {
@Override
public void run() {
try {
while (true) {
List<DataNodeInfo> toRemoveDatanodes = new ArrayList<>();
Iterator<DataNodeInfo> datanodesIterator = datanodes.values().iterator();
DataNodeInfo datanode = null;
while (datanodesIterator.hasNext()) {
datanode = datanodesIterator.next();
// 遍历保存的DataNode节点,如果超过90秒未上送心跳,则移除
if (System.currentTimeMillis() - datanode.getLatestHeartbeatTime() > 90 * 1000) {
toRemoveDatanodes.add(datanode);
}
}
if (!toRemoveDatanodes.isEmpty()) {
for (DataNodeInfo toRemoveDatanode : toRemoveDatanodes) {
System.out.println("数据节点【" + toRemoveDatanode + "】宕机,需要进行副本复制......");
// 1.创建一个副本复制任务
createLostReplicaTask(toRemoveDatanode);
// 2.从DataNode集群元数据中删除该节点
removeDeadDataNode(toRemoveDatanode);
}
}
// 每隔30秒检测一次
Thread.sleep(30 * 1000);
}
} catch (Exception e) {
e.printStackTrace();
}
}
副本复制任务的创建,关键是要确定好源DataNode和目标DataNode:
// DataNodeAliveMonitor.java
private void createLostReplicaTask(DataNodeInfo deadDataNode) {
List<String> files = fileMappedByDataNode.get(deadDataNode.getId());
// 为每个文件创建一个副本复制任务
for (String file : files) {
String filename = file.split("_")[0];
Long filesize = Long.valueOf(file.split("_")[1]);
// 获取复制任务的源数据节点
DataNodeInfo sourceDatanode = getReplicateSource(filename, deadDataNode);
// 获取复制任务的目标数据节点
DataNodeInfo destDatanode = getReplicateTarget(filesize, sourceDatanode, deadDataNode);
// 创建复制任务
ReplicateTask replicateTask = new ReplicateTask(filename, filesize, sourceDatanode,
destDatanode);
// 将复制任务放到目标数据节点的任务队列里去
destDatanode.addReplicateTask(replicateTask);
System.out.println("为目标数据节点生成一个副本复制任务," + replicateTask);
}
}
/**
* 获取复制任务的源数据节点,即挂掉的DataNode的存活镜像节点
*/
public DataNodeInfo getReplicateSource(String filename, DataNodeInfo deadDatanode) {
DataNodeInfo replicateSource = null;
try {
rrw.readLock().lock();
List<DataNodeInfo> replicas = datanodeMappedByFile.get(filename);
for (DataNodeInfo replica : replicas) {
if (!replica.equals(deadDatanode)) {
replicateSource = replica;
break;
}
}
} finally {
rrw.readLock().unlock();
}
return replicateSource;
}
/**
* 获取复制任务的目标节点
*/
private DataNodeInfo getReplicateTarget(Long filesize, DataNodeInfo sourceDatanode, DataNodeInfo deadDataNode) {
synchronized (this) {
DataNodeInfo targetDataNode = null;
for (DataNodeInfo datanode : datanodes.values()) {
// 目标节点不能是源节点,也不能是挂掉的节点
if (!datanode.equals(sourceDatanode) && !datanode.equals(deadDataNode)) {
targetDataNode = datanode;
break;
}
}
if (targetDataNode != null) {
targetDataNode.addStoredDataSize(filesize);
}
return targetDataNode;
}
}
我们之前讲过,每个文件都会有两个副本,分别保存在两个DataNode节点上。那么当一个DataNode节点宕机后,就要选一个剩余的存活DataNode节点作为源节点。而目标节点的选取,只要保证不是源节点和挂掉的节点即可。
最后,重分配任务会缓存到目前节点的一个内存队列中:
// 重分配任务
public class ReplicateTask {
private String filename;
private Long fileLength;
private DataNodeInfo sourceDatanode;
private DataNodeInfo destDatanode;
//...
}
public class DataNodeInfo implements Comparable<DataNodeInfo> {
/**
* 副本复制任务队列
*/
private ConcurrentLinkedQueue<ReplicateTask> replicateTaskQueue = new ConcurrentLinkedQueue<>();
public void addReplicateTask(ReplicateTask replicateTask) {
replicateTaskQueue.offer(replicateTask);
}
public ReplicateTask pollReplicateTask() {
if(!replicateTaskQueue.isEmpty()) {
return replicateTaskQueue.poll();
}
return null;
}
//...
}
2.2 重分配任务下发
接着,我们来看副本重分配任务的下发过程。当DataNode向NameNode发送心跳时,NameNode会查看是否存在以当前DataNode作为目标节点的 副本分配任务 ,如果有,就通过心跳响应任务信息:
// NameNodeServiceImpl.java
/**
* DataNode心跳
*/
@Override
public void heartbeat(HeartbeatRequest request, StreamObserver<HeartbeatResponse> responseObserver) {
// 使用DataNodeManager组件完成DataNode心跳
boolean succ = datanodeManager.heartbeat(request.getIp(), request.getHostname(), request.getNioPort());
HeartbeatResponse response = null;
List<Command> cmdList = new ArrayList<>();
if (succ) {
// 1.判断当前目标节点是否包含副本复制任务
DataNodeInfo datanode = datanodeManager.getDataNodeInfo(request.getIp(), request.getHostname());
ReplicateTask replicateTask = null;
while ((replicateTask = datanode.pollReplicateTask()) != null) {
Command command = new Command(Command.REPLICATE);
command.setContent(JSONObject.toJSONString(replicateTask));
cmdList.add(command);
}
// 2.判断当前目标节点是否包含副本删除任务
RemoveReplicaTask removeReplicaTask = null;
while ((removeReplicaTask = datanode.pollRemoveReplicaTask()) != null) {
Command removeReplicaCommand = new Command(Command.REMOVE_REPLICA);
removeReplicaCommand.setContent(JSONObject.toJSONString(removeReplicaTask));
cmdList.add(removeReplicaCommand);
}
System.out.println("接收到数据节点【" + datanode + "】的心跳,他的命令列表为:" + cmdList);
response = HeartbeatResponse.newBuilder()
.setStatus(STATUS_SUCCESS)
.setCommands(JSONArray.toJSONString(cmdList))
.build();
} else {
// 服务注册任务
Command regCmd = new Command(Command.REGISTER);
// 全量上报任务
Command fullReportCmd = new Command(Command.REPORT_COMPLETE_STORAGE_INFO);
cmdList.add(regCmd);
cmdList.add(fullReportCmd);
response = HeartbeatResponse.newBuilder()
.setStatus(STATUS_FAILURE)
.setCommands(JSONArray.toJSONString(cmdList))
.build();
}
responseObserver.onNext(response);
responseObserver.onCompleted();
}
当DataNode收到响应后,需要根据任务信息执行副本重分配流程,即从源DataNode节点下载文件到本地保存:
// LeaseManager.java
/**
* 负责心跳的线程
*/
private class HeartbeatThread extends Thread {
@Override
public void run() {
while (true) {
try {
// 发送心跳
HeartbeatResponse response = rpcClient.heartbeat();
if (response.getStatus() == STATUS_SUCCESS) {
JSONArray commands = JSONArray.parseArray(response.getCommands());
if (commands.size() > 0) {
for (int i = 0; i < commands.size(); i++) {
JSONObject command = commands.getJSONObject(i);
Integer type = command.getInteger("type");
JSONObject task = command.getJSONObject("content");
if (type.equals(COMMAND_REPLICATE)) {
replicateManager.addReplicateTask(task);
System.out.println("接收副本复制任务," + command);
} else if (type.equals(COMMAND_REMOVE_REPLICA)) {
System.out.println("接收副本删除任务," + command);
String filename = task.getString("filename");
String absoluteFilename = DataNodeInfo.getAbsoluteFilename(filename);
File file = new File(absoluteFilename);
if (file.exists()) {
file.delete();
}
}
}
}
}else if(response.getStatus()==STATUS_FAILURE){
JSONArray commands = JSONArray.parseArray(response.getCommands());
for(int i = 0; i < commands.size(); i++) {
JSONObject command = commands.getJSONObject(i);
Integer type = command.getInteger("type");
// 如果是注册的命令
if(type.equals(COMMAND_REGISTER)) {
rpcClient.register();
}
// 如果是全量上报的命令
else if(type.equals(COMMAND_REPORT_COMPLETE_STORAGE_INFO)) {
DataNodeInfo storageInfo = storageManager.getStorageInfo();
rpcClient.fullyReportDataNodeInfo(storageInfo);
}
}
}
Thread.sleep(30 * 1000);
} catch (Exception e) {
System.out.println("当前NameNode不可用,心跳失败.......");
}
}
}
}
ReplicateManager是DataNode侧实际执行副本复制的一个管理类,它会先将任务扔到一个内存队列中,然后通过线程池并发获取任务执行:
/**
* 副本复制管理组件
*/
public class ReplicateManager {
public static final Integer REPLICATE_THREAD_NUM = 3;
private DFSNIOClient nioClient = new DFSNIOClient();
private NameNodeRpcClient namenodeRpcClient;
/**
* 副本复制任务队列
*/
private ConcurrentLinkedQueue<JSONObject> replicateTaskQueue = new ConcurrentLinkedQueue<>();
public ReplicateManager(NameNodeRpcClient namenodeRpcClient) {
this.namenodeRpcClient = namenodeRpcClient;
for (int i = 0; i < REPLICATE_THREAD_NUM; i++) {
new ReplicateWorker().start();
}
}
/**
* 添加复制任务
*/
public void addReplicateTask(JSONObject replicateTask) {
replicateTaskQueue.offer(replicateTask);
}
/**
* 副本复制线程
*/
class ReplicateWorker extends Thread {
@Override
public void run() {
while (true) {
FileOutputStream imageOut = null;
FileChannel imageChannel = null;
try {
JSONObject replicateTask = replicateTaskQueue.poll();
if (replicateTask == null) {
Thread.sleep(1000);
continue;
}
System.out.println("开始执行副本复制任务......");
// 解析复制任务
String filename = replicateTask.getString("filename");
Long fileLength = replicateTask.getLong("fileLength");
JSONObject sourceDatanode = replicateTask.getJSONObject("sourceDatanode");
String hostname = sourceDatanode.getString("hostname");
Integer nioPort = sourceDatanode.getInteger("nioPort");
// 跟源头数据接头通信读取图片过来
byte[] file = nioClient.readFile(hostname, nioPort, filename);
ByteBuffer fileBuffer = ByteBuffer.wrap(file);
System.out.println("从源头数据节点读取到图片,大小为:" + file.length + "字节");
// 根据文件的相对路径定位到绝对路径,写入本地磁盘文件中
String absoluteFilename = DataNodeInfo.getAbsoluteFilename(filename);
imageOut = new FileOutputStream(absoluteFilename);
imageChannel = imageOut.getChannel();
imageChannel.write(fileBuffer);
System.out.println("将图片写入本地磁盘文件,路径为:" + absoluteFilename);
// 进行增量上报
namenodeRpcClient.deltaReportDataNodeInfo(filename, fileLength);
System.out.println("向Master节点进行增量上报......");
} catch (Exception e) {
e.printStackTrace();
} finally {
//...
}
}
}
}
}
2.3 删除任务下发
最后,我们来看下删除任务下发的流程。当各个DataNode向NameNode进行增量或全量上报自身信息时,NameNode侧如果发现某个文件已经存在足够的副本,那么这个DataNode很可能是宕机后恢复的,所以会要求它删除自己的那个文件副本。
增量上报和全量上报的改造逻辑如下:
- 检查当前上报节点上送的文件信息,判断该文件是否已经存在于2个以上的DataNode中;
- 如果是,则创建一个RemoveReplicaTask任务;
- 当下次该DataNode发送心跳时,就会获取到该任务,完成本地文件的删除。
// DataNodeManager.java
/**
* 增量上报
*/
public void deltaReportDataNodeInfo(String ip, String hostname, String filename, Long fileLength) {
try {
rrw.writeLock().lock();
List<DataNodeInfo> replicas = datanodeMappedByFile.get(filename);
if (replicas == null) {
replicas = new ArrayList<>();
datanodeMappedByFile.put(filename, replicas);
}
DataNodeInfo datanode = this.getDataNodeInfo(ip, hostname);
// 检查当前文件的副本数量是否超标
if (replicas.size() == REPLICA_NUM) {
// 减少这个节点上的存储数据量
datanode.addStoredDataSize(-fileLength);
// 生成文件删除任务
RemoveReplicaTask removeReplicaTask = new RemoveReplicaTask(filename, datanode);
datanode.addRemoveReplicaTask(removeReplicaTask);
} else {
// 如果副本数量未超标,才会将副本放入数据结构中
replicas.add(datanode);
// 维护每个数据节点拥有的文件副本
List<String> files = fileMappedByDataNode.get(ip + "-" + hostname);
if (files == null) {
files = new ArrayList<String>();
fileMappedByDataNode.put(ip + "-" + hostname, files);
}
files.add(filename + "_" + fileLength);
}
System.out.println("收到DataNode增量上报信息,当前的副本信息为:" + datanodeMappedByFile);
} finally {
rrw.writeLock().unlock();
}
}
/**
* 全量上报
*/
public void fullyReportDataNodeInfo(String ip, String hostname, List<String> filenameList, Long storedDataSize) {
DataNodeInfo datanode = this.getDataNodeInfo(ip, hostname);
datanode.setStoredDataSize(storedDataSize);
for (String filename : filenameList) {
this.deltaReportDataNodeInfo(hostname, ip, filename, storedDataSize);
}
}
三、总结
本章,我对分布式文件系统的高可用架构进行了讲解,主要讲解了DataNode的高可用。DataNode在实现高可用时,最核心的就是它的文件副本重分配机制。
我们解决该问题的思路就是:NameNode对于异常DataNode节点,创建 文件副本转移任务 和 文件副本删除任务 ,然后利用心跳,向DataNode下发这些任务,完成文件迁移和删除。