分布式文件系统的文件存储的最后一块内容,就是实现DataNode的信息上报。我在前面章节已经讲过,DataNode信息上报分为 增量信息上报 和 全量信息上报 ,增量信息上报在文件上传完成后由DataNodeNIOServer
调用,全量信息上报则需要在DataNode首次注册时完成。
一、信息上报
我们首先来看DataNode的启动。DataNode内部封装了很多核心组件,与信息上报最相关的就是LeaseManager
和StorageManager
:
/**
* DataNode启动类
*/
public class DataNode {
private volatile Boolean isRunning;
// 与NameNode进行RPC通信的组件
private NameNodeRpcClient rpcClient;
// 负责文件存储管理的组件
private StorageManager storageManager;
// 负责心跳和注册管理的组件
private LeaseManager leaseManager;
// 负责NIO通信的组件
private DataNodeNIOServer nioServer;
public static void main(String[] args) {
DataNode datanode = new DataNode();
datanode.initialize();
datanode.start();
}
/**
* 初始化DataNode
*/
private void initialize() {
this.rpcClient = new NameNodeRpcClient();
this.storageManager = new StorageManager();
this.nioServer = new DataNodeNIOServer(rpcClient);
this.leaseManager = new LeaseManager(rpcClient, storageManager);
// 1.启动后立即进行一次注册
leaseManager.register();
// 2.开启心跳
leaseManager.heartbeat();
// 3.开启NIO Server
nioServer.start();
}
/**
* 运行DataNode
*/
private void start() {
this.isRunning = true;
try {
while (isRunning) {
Thread.sleep(1000);
}
} catch (Exception e) {
this.isRunning = false;
e.printStackTrace();
}
}
}
1.1 全量上报
DataNode初始化过程中,会立即调用LeaseManager的register
方法进行一次服务注册,服务注册成功后就会进行一次全量DataNode信息上报:
/**
* Data的服务注册和心跳管理组件
*/
public class LeaseManager {
// 首次注册成功
public static final Integer REGISTER_FIRST_SUCCESS = 10000;
// 已经注册过
public static final Integer REGISTER_EXIST = 10001;
// 首次心跳(还没注册)
public static final Integer RENEW_FIRST = 10003;
// 已经存在且心跳成功
public static final Integer RENEW_SUCCESS = 10004;
private NameNodeRpcClient rpcClient;
private StorageManager storageManager;
public LeaseManager(NameNodeRpcClient rpcClient, StorageManager storageManager) {
this.rpcClient = rpcClient;
this.storageManager = storageManager;
}
public Boolean register() {
RegisterResponse response = rpcClient.register();
if (response.getStatus() == REGISTER_FIRST_SUCCESS) {
// 首次注册成功
System.out.println("首次注册成功,需要全量上报存储信息......");
DataNodeInfo dataNodeInfo = storageManager.getStorageInfo();
rpcClient.fullyReportDataNodeInfo(dataNodeInfo);
return true;
} else {
// 节点已经存在不能重复注册
System.out.println("节点已注册,不需要全量上报存储信息......");
return false;
}
}
public void heartbeat() {
new HeartbeatThread().start();
}
/**
* 负责心跳的线程
*/
private class HeartbeatThread extends Thread {
@Override
public void run() {
while (true) {
try {
// 发送心跳
HeartbeatResponse response = rpcClient.heartbeat();
if (response.getStatus() == RENEW_FIRST) {
//心跳失败,节点还没注册
register();
}
Thread.sleep(30 * 1000);
} catch (Exception e) {
System.out.println("当前NameNode不可用,心跳失败.......");
}
}
}
}
}
我们来思考一下,NameNode接受到DataNode的服务注册请求后,会做哪些判断和处理?
- NameNode缓存有该DataNode的信息,也就说该DataNode已经注册过,则不再进行全量上报;
- NameNode中不存在该DataNode的信息,可能是DataNode从未注册,也可能是DataNode注册后长时间未发送心跳被剔除了,则需要进行全量上报;
针对这两类请求,NameNode会返回不同的响应码,而DataNode就需要针对首次注册的响应做处理。全量上报所需的DataNode信息由StorageManager
组件生成:
/**
* 负责文件存储管理的组件
*/
public class StorageManager {
/**
* 获取DataNode的存储信息
*/
public DataNodeInfo getStorageInfo() {
DataNodeInfo dataNodeInfo = new DataNodeInfo();
File rootDir = new File(DATA_DIR);
File[] children = rootDir.listFiles();
if (children != null || children.length > 0) {
for (File child : children) {
scanFiles(child, dataNodeInfo);
}
}
return dataNodeInfo;
}
/*---------------------------------PRIVATE METHOD-------------------------------*/
/**
* 递归扫描文件信息
*
* @param rootDir
*/
private void scanFiles(File rootDir, DataNodeInfo dataNodeInfo) {
if (rootDir.isFile()) {
String path = rootDir.getPath();
path = path.substring(DATA_DIR.length());
path = path.replace("\\", "/"); // /image/product/iphone.jpg
dataNodeInfo.addFilename(path);
dataNodeInfo.addStoredDataSize(rootDir.length());
return;
}
File[] children = rootDir.listFiles();
if (children == null || children.length == 0) {
return;
}
for (File child : children) {
scanFiles(child, dataNodeInfo);
}
}
}
上述包含两部分:
- 当前DataNode存储的所有文件名列表;
- 当前DataNode存储的所有文件总大小。
这些信息最终会通过NameNodeRpcClient的RPC调用发送给Name Node:
// NameNodeRpcClient.java
/**
* 向NameNode全量上报元数据信息
*/
public void fullyReportDataNodeInfo(DataNodeInfo dataNodeInfo) {
if (dataNodeInfo == null) {
System.out.println("当前没有存储任何文件,不需要全量上报.....");
return;
}
FullyReportRequest request = FullyReportRequest.newBuilder()
.setIp(DATANODE_IP)
.setHostname(DATANODE_HOSTNAME)
.setFilenameList(JSONArray.toJSONString(dataNodeInfo.getFilenames()))
.setStoredDataSize(dataNodeInfo.getStoredDataSize())
.build();
namenode.fullyReportDataNodeInfo(request);
System.out.println("全量上报DataNode信息:" + dataNodeInfo);
}
另外,我们还需要注意下,如果DataNode初始化过程中的那次服务注册失败了,没有完成全量信息的上报。那么,在后续的心跳过程中,如果发现NameNode侧不存在当前DataNode的信息,也会进行全量信息上报:
/**
* 负责心跳的线程
*/
private class HeartbeatThread extends Thread {
@Override
public void run() {
while (true) {
try {
// 发送心跳
HeartbeatResponse response = rpcClient.heartbeat();
if (response.getStatus() == RENEW_FIRST) {
//心跳失败,节点还没注册
register();
}
Thread.sleep(30 * 1000);
} catch (Exception e) {
System.out.println("当前NameNode不可用,心跳失败.......");
}
}
}
}
1.2 增量上报
DataNode增量信息上报,在文件上传的流程中完成:
// DataNodeNIOServer.java
/**
* 工作线程
*/
class Worker extends Thread {
@Override
public void run() {
while (true) {
SocketChannel channel = null;
try {
//...
// 5.3 判断是否读取完毕
if (hasReadImageLength == filesize) {
ByteBuffer outBuffer = ByteBuffer.wrap("SUCCESS".getBytes());
channel.write(outBuffer);
cachedRequest.remove(remoteAddr);
System.out.println("文件读取完毕,返回响应给客户端: " + remoteAddr);
// 增量上报DataNode元数据
rpcClient.deltaReportDataNodeInfo(filename, filesize);
System.out.println("增量上报收到的文件副本给NameNode节点......");
} else {
InflightRequest request = new InflightRequest(filename, filesize, hasReadImageLength);
cachedRequest.put(remoteAddr, request);
key.interestOps(SelectionKey.OP_READ);
System.out.println("文件没有读取完毕,等待下一次OP_READ请求,缓存文件:" + request);
}
} catch (Exception e) {
//...
}
}
}
}
最终由组件NameNodeRpcClient
调用RPC接口上送本次DataNode接受到的文件信息:
// NameNodeRpcClient.java
/**
* 向NameNode增量上报元数据信息
*/
public void deltaReportDataNodeInfo(String filename, long filesize) {
DeltaReportRequest request = DeltaReportRequest.newBuilder()
.setHostname(DATANODE_HOSTNAME)
.setIp(DATANODE_IP)
.setFilename(filename)
.setFilesize(filesize)
.build();
namenode.deltaReportDataNodeInfo(request);
System.out.println("增量上报DataNode信息:" + filename + ", " + filesize);
}
二、总结
本章,我对分布式文件系统的DataNode节点的信息上报功能进行了拆分讲解。至此,整个 文件存储 模块的功能就已经实现了。