从本章开始,我将对分布式文件系统的文件上传功能进行讲解。我们的分布式文件系统的大脑是NameNode,它负责管理整个集群的元数据,而真正负责数据存储的是DataNode。所以,我们在设计文件存储功能时,需要重点考虑以下几个方面的内容:
- 如何在DataNode集群中分布式地存储各种小文件?
- 如何保证DataNode的可用性?
- NameNode如何感知DataNode集群的信息?
- 客户端如何与NameNode、DataNode通信,完成文件的上传和下载?
解决了上述四个问题,我们的分布式文件系统的存储架构也就浮现出来了。本章,我先以文件上传和下载功能为例,讲解分布式文件系统的文件存储全流程,后续章节再带领大家逐步实现各个功能点。
由于文件存储这部分的内容涉及重构前面章节的存量代码,所以我重新上传了一份代码到Gitee上,文件存储部分的代码:https://gitee.com/ressmix/source-code/tree/master/5.dfs/2.storage 。
一、整体架构
对于分布式文件系统的数据存储来说,一般都是采用数据分散集群的架构。我们先来看 文件上传 的整体流程:
- 首先,DFS客户端与NameNode通信,确认文件存储在哪些DataNode节点上;
- 接着,DFS客户端直接与DataNode通信,进行文件的多副本上传;
- 同时,DataNode会不断向NameNode上报自己的存储元数据信息,以便NameNode对DataNode节点的上下线状态进行感知以及负载均衡等。
再来看 文件下载 的整体流程:
- 首先,DFS客户端与NameNode通信,确认要下载的文件分布在哪些DataNode节点上;
- NameNode会根据负载均衡算法,选择一个DataNode节点并返回其信息;
- 最后,DFS客户端通过NIO网络通信,从指定DataNode节点完成文件下载。
关于数据分散集群,我在《分布式系统从理论到实战》系列的专栏中进行过详细讲解,不熟悉的读者可以先去看一看。
1.1 元数据管理
DFS客户端会提供文件上传和下载接口,通过RPC方式与NameNode进行通信,获取需要的DataNode节点信息。对于文件文件,NameNode接受到DFS的请求后,会在内存中维护文件目录树,并写edits log日志。BackupNode同步到日志后,进行回放,并维护fsimage快照文件。所以,这套机制和我前面章节讲解的元数据管理完全相同。
1.2 高可用
DataNode负责实际的数据存储,所以不能是单点。而保证DataNode高可用的最基本思路就是 冗余 ,那么到底需要几个副本来冗余存储呢?
我们来看下Hadoop HDFS的设计,HDFS在设计节点冗余时,考虑了 机架和机房 的设计,默认使用3个副本,其中2个副本分配在同一个机架上,另外一个副本在别的机架上,这样万一某个机架故障了(机架上的所有机器都会出问题),那别的机架上还有一个副本可用。
我们的分布式文件系统不需要设计得这么复杂,我们采用 双副本模式 ,即一个文件存储两份(用两个对等DataNode节点保存),对于一些中等规模的公司来说已经足够了。
1.3 负载均衡
有了副本冗余机制后,NameNode需要在可用的DataNode节点中进行选择,并响应给客户端,那么如何选择呢?这就涉及到了负载均衡算法。
常见的负载均衡算法有很多,比如Round Robin、负载最小、权重等等。对于文件上传,我们可以根据DataNode的 存储数据量 作负载,也就是说,NameNode会从所有可用的DataNode中选出2个数据量最少的节点,返回给客户端;对于文件下载,我们直接随机选择一个DataNode节点返回。
1.4 数据流传输
DFS客户端接受到NameNode的响应后,需要与指定的DataNode通信,完成文件的实际上传/下载操作。那么,应该采用什么样的传输方式呢?对于文件下载,我们可以直接基于NIO短连接通信来实现。而对于文件上传来说,由于涉及多个副本节点,一般有两种方案:
- 管道数据流;
- 基于NIO的多副本依次上传。
Hadoop HDFS是采用的管道数据流。所谓管道数据流,就是说客户端只与一个Node通信,Node之间的数据传输由Node自己完成,这种方式可以避免客户端的网络连接过多,负载过重,同时机器之间传输数据的性能要比客户端直连更高。
我们的分布式文件系统没必要搞那么复杂,我直接基于Java NIO来实现DFS客户端与DataNode之间的网络通信。
1.5 信息上报
DataNode需要定期将自己的信息上报给NameNode,包括自己存储的文件列表、每个文件的信息等等,这样NameNode才能对DataNode的元数据进行管理,从而选择合适的节点返回给DFS客户端。
我们可以基于心跳机制实现信息上报,但是需要考虑NameNode和DataNode宕机等异常情况。因为NameNode一旦宕机,内存中保存的DataNode信息就会丢失。
所以,我们可以设计 增量上报 和 全量上报 两种模式。
- 全量上报:只要NameNode内存中没有DataNode的信息,那么对于DataNode上送的注册/心跳请求,都要求DataNode进行全量上报。这种情况一般有两种可能:
- NameNode首次启动;
- DataNode长时间未发送心跳,导致元数据被NameNode剔除。
- 增量上报:每次DataNode处理完到DFS客户端上传的文件后,立即进行一次增量上报。
二、接口设计
根据对上述分布式文件系统的整体架构分析,我们需要设计以下几类接口:
- DFS客户端的文件上传/下载接口;
- DataNode信息上报接口;
- DataNode心跳和注册接口。
其中DataNode的心跳和注册接口我在前面 元数据管理 的相关章节中已经实现了,后续章节我会根据信息上报的需求对这块的代码进行改造,以支持增量/全量信息上报。
2.1 文件上传接口
文件上传接口分为三部分:
create
RPC接口:DFS客户端首先发送RPC请求给NameNode,上送待上传的文件信息。NameNode收到请求后,在内存中维护文件目录树,并写Edits Log日志;allocateDataNodes
RPC接口:DFS客户端接受到create
请求的成功响应后,发送要上传的文件元数据信息,包括文件名、路径、文件大小等。NameNode收到请求后,根据负载均衡算法返回DataNode列表;sendFile
NIO文件上传接口:DFS客户端接受到DataNode列表后,基于Java NIO与各个DataNode进行通信,上传文件。
上传三个接口的交互如下图:
2.2 文件下载接口
文件下载接口分为两部分:
getDataNodeForFile
接口:DFS客户端首先发送RPC请求给NameNode,上送待下载的文件信息。NameNode收到请求后,随机选择一个DataNode并返回;readFile
NIO文件下载接口:DFS客户端接收到DataNode信息后,基于Java NIO与该DataNode通信,完成文件下载。
2.3 信息上报接口
DataNode的信息上报接口有两个:
deltaReportDataNodeInfo
增量上报接口:DataNode接收到客户端上传的文件后,需要调用该接口上报文件信息给NameNode;fullyReportDataNodeInfo
全量上报接口:DataNode在向NameNode发送服务注册和心跳请求时,会根据NameNode侧是否存在自己的元数据信息,选择是否上报节点的文件列表和数据量等信息。
2.4 注册和心跳接口
我们需要对DataNode的服务注册和心跳的逻辑进行改写,以满足信息的增量/全量上报需求:
register
服务注册接口:DataNode节点首次启动时,进行服务注册;heartbeat
服务心跳接口:默认每隔30s,DataNode向NameNode发送一次心跳请求。
三、DataNode设计
文件存储的核心逻辑都在DataNode侧,我们的DataNode工程如下:
本节,我初步讲解下上面各个类的功能:
- DataNodeConfig :顾名思义,就是保存DataNode的配置;
- DataNodeInfo :DataNode元数据的抽象,包含ip、端口、存储文件列表、存储数据大小等信息;
- StorageManager :负责文件存储管理的组件,提供方法实时获取DataNode的元数据信息;
- LeaseManager :负责心跳和注册管理的组件;
- DataNodeNIOServer :基于Java NIO实现的一个Server,处理文件上传/下载的请求;
- NameNodeRpcClient :基于gRPC的客户端实现,用于与NameNode通信;
- DataNode :DataNode启动类。
四、RPC接口实现
我们针对需要新增的gRPC接口改写protobuf接口文件:
syntax = "proto3";
option java_multiple_files = true;
option java_outer_classname = "NameNodeServiceProto";
service NameNodeService {
rpc register(RegisterRequest) returns (RegisterResponse){}
rpc heartbeat(HeartbeatRequest) returns (HeartbeatResponse){}
rpc mkdir(MkDirRequest) returns (MkDirResponse){}
rpc shutdown(ShutdownRequest) returns (ShutdownResponse){}
rpc fetchEditsLog(FetchEditsLogRequest) returns (FetchEditsLogResponse){}
rpc updateCheckpointTxid(UpdateCheckpointTxidRequest) returns (UpdateCheckpointTxidResponse){}
rpc createFile(CreateFileRequest) returns (CreateFileResponse){}
rpc allocateDataNodes(AllocateDataNodesRequest) returns (AllocateDataNodesResponse){}
rpc getDataNodeForFile(GetDataNodeForFileRequest) returns (GetDataNodeForFileResponse){}
rpc fullyReportDataNodeInfo(FullyReportRequest) returns (FullyReportResponse){}
rpc deltaReportDataNodeInfo(DeltaReportRequest) returns (DeltaReportResponse){}
}
message RegisterRequest{
string ip = 1;
string hostname = 2;
int32 nioPort = 3;
}
message RegisterResponse{
int32 status = 1;
}
message HeartbeatRequest{
string ip = 1;
string hostname = 2;
int32 nioPort = 3;
}
message HeartbeatResponse{
int32 status = 1;
}
message MkDirRequest{
string path = 1;
}
message MkDirResponse{
int32 status = 1;
}
message ShutdownRequest{
int32 code = 1;
}
message ShutdownResponse{
int32 status = 1;
}
message FetchEditsLogRequest{
int64 syncedTxid = 1;
}
message FetchEditsLogResponse{
string editsLog = 1;
}
message UpdateCheckpointTxidRequest{
int64 txid = 1;
}
message UpdateCheckpointTxidResponse{
int32 status = 1;
}
message CreateFileRequest{
string filename = 1;
}
message CreateFileResponse{
int32 status = 1;
}
message AllocateDataNodesRequest{
string filename = 1;
int64 filesize = 2;
}
message AllocateDataNodesResponse{
int32 status = 1;
string datanodes = 2;
}
message GetDataNodeForFileRequest{
string filename = 1;
}
message GetDataNodeForFileResponse{
int32 status = 1;
string datanode = 2;
}
message FullyReportRequest{
string filenameList = 1;
int64 storedDataSize = 2;
string hostname = 3;
string ip = 4;
}
message FullyReportResponse{
int32 status = 1;
}
message DeltaReportRequest{
string filename = 1;
int64 filesize = 2;
string hostname = 3;
string ip = 4;
}
message DeltaReportResponse{
int32 status = 1;
}
接着,在dfs-client
工程中,新增createFile
、allocateDataNodes
、getDataNodeForFile
接口调用实现:
// NameNodeRpcClient.java
/**
* 创建文件
*/
@Override
public void createFile(CreateFileRequest request, StreamObserver<CreateFileResponse> responseObserver) {
try {
CreateFileResponse response = null;
if (!isRunning) {
response = CreateFileResponse.newBuilder().setStatus(STATUS_SHUTDOWN).build();
} else {
String filename = request.getFilename();
Boolean success = namesystem.createFile(filename);
if (success) {
response = CreateFileResponse.newBuilder().setStatus(STATUS_SUCCESS).build();
} else {
response = CreateFileResponse.newBuilder().setStatus(STATUS_FAILURE).build();
}
}
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 获取可供上传的DataNode节点及副本
*/
@Override
public void allocateDataNodes(AllocateDataNodesRequest request, StreamObserver<AllocateDataNodesResponse> responseObserver) {
try {
AllocateDataNodesResponse response = null;
if (!isRunning) {
response = AllocateDataNodesResponse.newBuilder().setStatus(STATUS_SHUTDOWN).build();
} else {
long fileSize = request.getFilesize();
List<DataNodeInfo> datanodes = datanodeManager.allocateDataNodes(fileSize);
String datanodesJson = JSONArray.toJSONString(datanodes);
response = AllocateDataNodesResponse.newBuilder().setStatus(STATUS_SUCCESS).setDatanodes(datanodesJson).build();
}
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 下载文件
*/
@Override
public void getDataNodeForFile(GetDataNodeForFileRequest request, StreamObserver<GetDataNodeForFileResponse> responseObserver) {
String filename = request.getFilename();
DataNodeInfo datanode = datanodeManager.getDataNodeForFile(filename);
GetDataNodeForFileResponse response = GetDataNodeForFileResponse.newBuilder()
.setDatanode(JSONObject.toJSONString(datanode))
.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
3.1 上传DataNode分配
NameNode侧对于接受到的文件上传请求,需要记录元数据,并分配可用的DataNode双副本:
// NameNodeServiceImpl.java
/**
* 创建文件
*/
@Override
public void createFile(CreateFileRequest request, StreamObserver<CreateFileResponse> responseObserver) {
try {
CreateFileResponse response = null;
if (!isRunning) {
response = CreateFileResponse.newBuilder().setStatus(STATUS_SHUTDOWN).build();
} else {
String filename = request.getFilename();
Boolean success = namesystem.createFile(filename);
if (success) {
response = CreateFileResponse.newBuilder().setStatus(STATUS_SUCCESS).build();
} else {
response = CreateFileResponse.newBuilder().setStatus(STATUS_FAILURE).build();
}
}
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 获取可供上传的DataNode节点及副本
*/
@Override
public void allocateDataNodes(AllocateDataNodesRequest request, StreamObserver<AllocateDataNodesResponse> responseObserver) {
try {
AllocateDataNodesResponse response = null;
if (!isRunning) {
response = AllocateDataNodesResponse.newBuilder().setStatus(STATUS_SHUTDOWN).build();
} else {
long fileSize = request.getFilesize();
List<DataNodeInfo> datanodes = datanodeManager.allocateDataNodes(fileSize);
String datanodesJson = JSONArray.toJSONString(datanodes);
response = AllocateDataNodesResponse.newBuilder().setStatus(STATUS_SUCCESS).setDatanodes(datanodesJson).build();
}
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (Exception e) {
e.printStackTrace();
}
}
上述代码分别时调用了FSNameSystem.createFile
、DataNodeManager.allocateDataNodes
完成对应的功能:
// DataNodeManager.java
/**
* 获取可供上传的DataNode节点及副本
*/
@Override
public void allocateDataNodes(AllocateDataNodesRequest request, StreamObserver<AllocateDataNodesResponse> responseObserver) {
try {
AllocateDataNodesResponse response = null;
if (!isRunning) {
response = AllocateDataNodesResponse.newBuilder().setStatus(STATUS_SHUTDOWN).build();
} else {
long fileSize = request.getFilesize();
List<DataNodeInfo> datanodes = datanodeManager.allocateDataNodes(fileSize);
String datanodesJson = JSONArray.toJSONString(datanodes);
response = AllocateDataNodesResponse.newBuilder()
.setStatus(STATUS_SUCCESS).setDatanodes(datanodesJson).build();
}
responseObserver.onNext(response);
responseObserver.onCompleted();
} catch (Exception e) {
e.printStackTrace();
}
}
FSNameSystem.createFile
属于维护文件目录树的操作,其中包含了记录edits log日志,BackupNode会从NameNode同步日志并进行回放,这块内容我就不再赘述了,读者可以自行研读代码:
// FSNameSystem.java
/**
* 创建文件
*/
public Boolean createFile(String filename) throws Exception {
if (!directory.createFile(filename)) {
return false;
}
editlog.logEdit(EditLogFactory.createFile(filename));
return true;
}
我们重点来看NameNode是如何选择供上传的DataNode节点的。这一工作由DataNodeManager
组件完成,DataNodeManager在内存中维护整个DataNode集群的信息,然后根据数据存储量选择两个数据量最小的节点返回:
public class DataNodeManager {
// 集群中所有的DataNode信息,Key为IP-HOSTNAME
private Map<String, DataNodeInfo> datanodes = new ConcurrentHashMap<>();
/**
* 获取可用的DataNode节点
*/
public List<DataNodeInfo> allocateDataNodes(long fileSize) {
synchronized (this) {
List<DataNodeInfo> datanodeList = new ArrayList<>();
for (DataNodeInfo datanode : datanodes.values()) {
datanodeList.add(datanode);
}
Collections.sort(datanodeList);
// 选择存储数据最少的头两个datanode出来
List<DataNodeInfo> selectedDatanodes = new ArrayList<>();
if (datanodeList.size() >= 2) {
selectedDatanodes.add(datanodeList.get(0));
selectedDatanodes.add(datanodeList.get(1));
// 增加节点存储数据的大小
datanodeList.get(0).addStoredDataSize(fileSize);
datanodeList.get(1).addStoredDataSize(fileSize);
}
return selectedDatanodes;
}
}
//...
}
3.2 下载DataNode分配
再来看NameNode侧是如何处理下载文件的RPC请求的:
// NameNodeServiceImpl.java
/**
* 下载文件
*/
@Override
public void getDataNodeForFile(GetDataNodeForFileRequest request, StreamObserver<GetDataNodeForFileResponse> responseObserver) {
String filename = request.getFilename();
DataNodeInfo datanode = datanodeManager.getDataNodeForFile(filename);
GetDataNodeForFileResponse response = GetDataNodeForFileResponse.newBuilder()
.setDatanode(JSONObject.toJSONString(datanode))
.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
上述代码还是调用了DatanodeManager的接口来完成功能,这里我用了读写锁,以提升并发处理性能:
// DatanodeManager
// 文件与DataNode的映射,Key为文件名
private Map<String, List<DataNodeInfo>> datanodeMappedByFile = new HashMap<>();
/**
* 获取可供下载的DataNode节点
*/
public DataNodeInfo getDataNodeForFile(String filename) {
try {
rrw.readLock().lock();
List<DataNodeInfo> datanodes = datanodeMappedByFile.get(filename);
int size = datanodes.size();
// 随机选择一个
Random random = new Random(System.currentTimeMillis());
int index = random.nextInt(size);
return datanodes.get(index);
} finally {
rrw.readLock().lock();
}
}
3.3 服务注册和心跳
我们需要对原来的服务注册和心跳接口进行改写,增加上送DataNode的nioPort
信息,所谓nioPort
,就是DataNode会在启动时开启一个NIO Server线程,用于实际接受客户端文件上传/下载的数据流。
我们先看下DataNode作为客户端侧的实现:
// NameNodeRpcClient.java
private NameNodeServiceGrpc.NameNodeServiceBlockingStub namenode;
/**
* 向NameNode进行一次注册
*/
public RegisterResponse register() {
RegisterRequest request = RegisterRequest.newBuilder()
.setIp(DATANODE_IP)
.setHostname(DATANODE_HOSTNAME)
.setNioPort(NIO_PORT)
.build();
RegisterResponse response = namenode.register(request);
System.out.println("完成向NameNode的注册,响应消息为:" + response.getStatus());
return response;
}
/**
* 向NameNode发送一次心跳
*/
public HeartbeatResponse heartbeat() {
HeartbeatRequest request = HeartbeatRequest.newBuilder()
.setIp(DATANODE_IP)
.setHostname(DATANODE_HOSTNAME)
.setNioPort(NIO_PORT)
.build();
return namenode.heartbeat(request);
}
再来看NameNode侧的RPC服务实现:
/**
* NameNode的RPC服务接口
*/
public class NameNodeServiceImpl extends NameNodeServiceGrpc.NameNodeServiceImplBase {
// 成功
public static final Integer STATUS_SUCCESS = 1;
// 失败
public static final Integer STATUS_FAILURE = 2;
// 已停机
public static final Integer STATUS_SHUTDOWN = 3;
/**
* DataNode注册
*/
@Override
public void register(RegisterRequest request, StreamObserver<RegisterResponse> responseObserver) {
// 使用DataNodeManager组件完成DataNode注册
Integer status = datanodeManager.register(request.getIp(), request.getHostname(), request.getNioPort());
RegisterResponse response = RegisterResponse.newBuilder().setStatus(status).build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
/**
* DataNode心跳
*/
@Override
public void heartbeat(HeartbeatRequest request, StreamObserver<HeartbeatResponse> responseObserver) {
// 使用DataNodeManager组件完成DataNode心跳
Integer status = datanodeManager.heartbeat(request.getIp(), request.getHostname(), request.getNioPort());
HeartbeatResponse response = HeartbeatResponse.newBuilder().setStatus(status).build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
//...
}
上述代码使用组件DatanodeManager
完成服务注册和心跳的功能,本质就是维护内存中的DataNode信息的Hash表:
/**
* 负责管理集群里的所有DataNode
*/
public class DataNodeManager {
// 首次注册
public static final Integer REGISTER_FIRST = 10000;
// 已经存在且注册成功
public static final Integer REGISTER_SUCCESS_EXIST = 10001;
// 首次心跳
public static final Integer RENEW_FIRST = 10003;
// 已经存在且心跳成功
public static final Integer RENEW_SUCCESS_EXIST = 10004;
// 集群中所有的DataNode信息,Key为IP-HOSTNAME
private Map<String, DataNodeInfo> datanodes = new ConcurrentHashMap<>();
// 文件与DataNode的映射,Key为文件名
private Map<String, List<DataNodeInfo>> datanodeMappedByFile = new HashMap<>();
/**
* datanode注册
*/
public Integer register(String ip, String hostname, int nioPort) {
if (containDataNode(ip, hostname)) {
System.out.println("当前已经存在这个DataNode了......");
return REGISTER_SUCCESS_EXIST;
}
setDataNodeInfo(ip, hostname, new DataNodeInfo(ip, hostname, nioPort));
System.out.println("DataNode注册:ip=" + ip + ",hostname=" + hostname + ", nioPort=" + nioPort);
return REGISTER_FIRST;
}
/**
* datanode心跳
*/
public Integer heartbeat(String ip, String hostname, int nioPort) {
DataNodeInfo datanode = getDataNodeInfo(ip, hostname);
if (datanode == null) {
return RENEW_FIRST;
}
datanode.setLatestHeartbeatTime(System.currentTimeMillis());
System.out.println("DataNode发送心跳:ip=" + ip + ",hostname=" + hostname);
return RENEW_SUCCESS_EXIST;
}
public DataNodeInfo getDataNodeInfo(String ip, String hostname) {
return datanodes.get(ip + "-" + hostname);
}
public void setDataNodeInfo(String ip, String hostname, DataNodeInfo dataNodeInfo) {
datanodes.put(ip + "-" + hostname, dataNodeInfo);
}
public Boolean containDataNode(String ip, String hostname) {
return datanodes.containsKey(ip + "-" + hostname);
}
//...
}
3.4 信息上报
DataNode的信息上报分为 全量上报 和 增量上报 ,这部分功能我放到后面章节与DataNode一起统一讲解。本章,我们就简单看一下DataNode侧的gRPC调用实现:
/**
* 与NameNode进行RPC通信的组件
*/
public class NameNodeRpcClient {
private NameNodeServiceGrpc.NameNodeServiceBlockingStub namenode;
public NameNodeRpcClient() {
ManagedChannel channel = NettyChannelBuilder
.forAddress(NAMENODE_HOSTNAME, NAMENODE_PORT)
.negotiationType(NegotiationType.PLAINTEXT)
.build();
this.namenode = NameNodeServiceGrpc.newBlockingStub(channel);
}
/**
* 向NameNode全量上报元数据信息
*/
public void fullyReportDataNodeInfo(DataNodeInfo dataNodeInfo) {
if (dataNodeInfo == null) {
System.out.println("当前没有存储任何文件,不需要全量上报.....");
return;
}
FullyReportRequest request = FullyReportRequest.newBuilder()
.setIp(DATANODE_IP)
.setHostname(DATANODE_HOSTNAME)
.setFilenameList(JSONArray.toJSONString(dataNodeInfo.getFilenames()))
.setStoredDataSize(dataNodeInfo.getStoredDataSize())
.build();
namenode.fullyReportDataNodeInfo(request);
System.out.println("全量上报DataNode信息:" + dataNodeInfo);
}
/**
* 向NameNode增量上报元数据信息
*/
public void deltaReportDataNodeInfo(String filename, long filesize) {
DeltaReportRequest request = DeltaReportRequest.newBuilder()
.setHostname(DATANODE_HOSTNAME)
.setIp(DATANODE_IP)
.setFilename(filename)
.setFilesize(filesize)
.build();
namenode.deltaReportDataNodeInfo(request);
System.out.println("增量上报DataNode信息:" + filename + ", " + filesize);
}
//...
}
五、总结
本章,我对分布式文件系统的存储架构进行了讲解。我们在设计文件上传和下载功能时,需要考虑元数据管理、DataNode的可用性、数据传输方式、DataNode节点选取策略等各方面的内容。
下一章,我将对DataNode如何基于Java NIO完成文件的上传和下载进行讲解。