文件上传源码分析

先上文件上传的方法调用过程时序图

其主要执行过程:

1、FileSystem初始化,Client拿到NameNodeRpcServer代理对象,建立与NameNode的RPC通信

2、调用FileSystem的create()方法,由于实现类为DistributedFileSystem,所以是调用该类中的create()方法

3、DistributedFileSystem持有DFSClient的引用,继续调用DFSClient中的create()方法

4、DFSOutputStream提供的静态newStreamForCreate()方法中调用NameNodeRpcServer服务端的create()方法并创建DFSOutputStream输出流对象返回

5、通过hadoop提供的IOUtil工具类将输出流输出到本地

下面我们来看下源码:

首先初始化文件系统,建立与服务端的RPC通信:

调用FileSystem的create()方法,由于FileSystem是一个抽象类,这里实际上是调用的该类的子类create()方法

1
2
3
4
5
6
7
8
public abstract FSDataOutputStream create(
Path var1,
FsPermission var2,
boolean var3,
int var4,
short var5,
long var6,
Progressable var8) throws IOException;

前面我们已经说过FileSystem.get()返回的是DistributedFileSystem对象,所以这里我们直接进入DistributedFileSystem:

1
2
3
4
5
6
7
8
FileSystem fs=FileSystem.get(new URI("hdfs://hadoop01:9000"), conf, "hadoop");
// 获取hdfs的操作对象 获取的是hdfs的操作句柄对象 FileSystem对象
FileSystem fileSystem = FileSystem.get(conf);
// DFS[DFSClient[clientName=DFSClient_NONMAPREDUCE_-22357267_1, ugi=hadoop (auth:SIMPLE)]]
System.out.println(fileSystem);

//返回的是DistributedFileSystem对象
System.out.println(fs instanceof DistributedFileSystem);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Override
public FSDataOutputStream create(final Path f, final FsPermission permission,
final EnumSet<CreateFlag> cflags, final int bufferSize,
final short replication, final long blockSize,
final Progressable progress, final ChecksumOpt checksumOpt)
throws IOException {
statistics.incrementWriteOps(1);
storageStatistics.incrementOpCounter(OpType.CREATE);
Path absF = fixRelativePart(f);
return new FileSystemLinkResolver<FSDataOutputStream>() {
@Override
public FSDataOutputStream doCall(final Path p) throws IOException {
final DFSOutputStream dfsos = dfs.create(getPathName(p), permission,
cflags, replication, blockSize, progress, bufferSize,
checksumOpt);
//dfs为DistributedFileSystem所持有的DFSClient对象,调用的是DFSClient.create方法
return dfs.createWrappedOutputStream(dfsos, statistics);
}
@Override
public FSDataOutputStream next(final FileSystem fs, final Path p)
throws IOException {
return fs.create(p, permission, cflags, bufferSize,
replication, blockSize, progress, checksumOpt);
}
}.resolve(this, absF);
}

DFSClient的create()返回一个DFSOutputStream对象:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public DFSOutputStream create(String src, FsPermission permission,
EnumSet<CreateFlag> flag, boolean createParent, short replication,
long blockSize, Progressable progress, int buffersize,
ChecksumOpt checksumOpt, InetSocketAddress[] favoredNodes,
String ecPolicyName) throws IOException {
checkOpen();
final FsPermission masked = applyUMask(permission);
LOG.debug("{}: masked={}", src, masked);
//调用DFSOutputStream的静态方法newStreamForCreate,返回输出流
final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
src, masked, flag, createParent, replication, blockSize, progress,
dfsClientConf.createChecksum(checksumOpt),
getFavoredNodesStr(favoredNodes), ecPolicyName);
beginFileLease(result.getFileId(), result);
return result;
}

我们继续看下newStreamForCreate()中的业务逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
tatic DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,
short replication, long blockSize, Progressable progress,
DataChecksum checksum, String[] favoredNodes, String ecPolicyName)
throws IOException {
try (TraceScope ignored =
dfsClient.newPathTraceScope("newStreamForCreate", src)) {
HdfsFileStatus stat = null;

// Retry the create if we get a RetryStartFileException up to a maximum
// number of times
boolean shouldRetry = true;
int retryCount = CREATE_RETRY_COUNT;
while (shouldRetry) {
shouldRetry = false;
try {
//通过dfsClient的 namenode 代理对象调用NamenodeRpcServer实现的create方法
stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
new EnumSetWritable<>(flag), createParent, replication,
blockSize, SUPPORTED_CRYPTO_VERSIONS, ecPolicyName);
break;
} catch (RemoteException re) {
IOException e = re.unwrapRemoteException(
AccessControlException.class,
DSQuotaExceededException.class,
QuotaByStorageTypeExceededException.class,
FileAlreadyExistsException.class,
FileNotFoundException.class,
ParentNotDirectoryException.class,
NSQuotaExceededException.class,
RetryStartFileException.class,
SafeModeException.class,
UnresolvedPathException.class,
SnapshotAccessControlException.class,
UnknownCryptoProtocolVersionException.class);
if (e instanceof RetryStartFileException) {
if (retryCount > 0) {
shouldRetry = true;
retryCount--;
} else {
throw new IOException("Too many retries because of encryption" +
" zone operations", e);
}
} else {
throw e;
}
}
}
Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!");
//new 出输出流对象
final DFSOutputStream out;
if(stat.getErasureCodingPolicy() != null) {
out = new DFSStripedOutputStream(dfsClient, src, stat,
flag, progress, checksum, favoredNodes);
} else {
out = new DFSOutputStream(dfsClient, src, stat,
flag, progress, checksum, favoredNodes, true);
}
//调用内部类DataStreamer的start()方法,DataStreamer继承 Thread,所以这是一个线程,从 Namenode 中申请新的 block 信息
//同时之前提到的 pipline(流水线)作业也是在这里实现的。
out.start();
return out;
}
}

到此,Client拿到了服务端的输出流对象,那么后面就容易了,都是一些简答的文件输出,输入流的操作(hadoop提供的IOUitl)。

文件下载源码分析

文件上传的大致流程与文件下载类似,与上传一样,我们先上程序方法调用时序图:

主要执行过程:

  1. FileSystem初始化,Client拿到NameNodeRpcServer代理对象,建立与NameNode的RPC通信(与前面一样)

  2. 调用FileSystem的open()方法,由于实现类为DistributedFileSystem,所有是调用该类中的open()方法

  3. DistributedFileSystem持有DFSClient的引用,继续调用DFSClient中的open()方法

  4. 实例化DFSInputStream输入流

  5. 调用openinfo()方法

  6. 调用fetchLocatedBlocksAndGetLastBlockLength()方法,抓取block信息并获取最后block长度

  7. 调用DFSClient中的getLocatedBlocks()方法,获取block信息

  8. 在callGetBlockLocations()方法中通过NameNode代理对象调用NameNodeRpcServer的getBlockLocations()方法

  9. 将block信息写入输出流

  10. 交给IOUtil,下载文件到本地

接下来,我们开始看源码:

首先任然是FileSystem的初始化,前面有,这里就不贴出来了,我们直接从DistributedFileSystem的open()开始看。

1
2
3
4
5
6
7
8
9
10
11
12
13
@Override
public FSDataInputStream open(PathHandle fd, int bufferSize)
throws IOException {
statistics.incrementReadOps(1);
storageStatistics.incrementOpCounter(OpType.OPEN);
if (!(fd instanceof HdfsPathHandle)) {
fd = new HdfsPathHandle(fd.bytes());
}
HdfsPathHandle id = (HdfsPathHandle) fd;
final DFSInputStream dfsis = dfs.open(id, bufferSize, verifyChecksum);
//dfs 为 DFSClient对象,调用 open()防护输入流
return dfs.createWrappedInputStream(dfsis);
}

DFSClient中并没有直接使用NameNode的代理对象,而是传给了DFSInputStream:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public DFSInputStream open(String src, int buffersize, boolean verifyChecksum)
throws IOException {
checkOpen();
// Get block info from namenode
try (TraceScope ignored = newPathTraceScope("newDFSInputStream", src)) {
LocatedBlocks locatedBlocks = getLocatedBlocks(src, 0);

return openInternal(locatedBlocks, src, verifyChecksum);
}
}

private DFSInputStream openInternal(LocatedBlocks locatedBlocks, String src,
boolean verifyChecksum) throws IOException {
if (locatedBlocks != null) {
ErasureCodingPolicy ecPolicy = locatedBlocks.getErasureCodingPolicy();
if (ecPolicy != null) {
return new DFSStripedInputStream(this, src, verifyChecksum, ecPolicy,
locatedBlocks);
}
//DFSClient中并没有直接使用NameNode的代理对象调用服务端方法,直接 new 输入流并把当前对象作为参数传入
return new DFSInputStream(this, src, verifyChecksum, locatedBlocks);
} else {
throw new IOException("Cannot open filename " + src);
}
}

那么在DFSInputStream必须持有DFSClient的引用:

1
2
3
4
5
6
7
8
9
10
11
12
//DFSInputStream.java 构造
DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum,
LocatedBlocks locatedBlocks) throws IOException {
this.dfsClient = dfsClient; //DFSClient的引用
this.verifyChecksum = verifyChecksum;
this.src = src;
synchronized (infoLock) {
this.cachingStrategy = dfsClient.getDefaultReadCachingStrategy();
}
this.locatedBlocks = locatedBlocks;
openInfo(false);
}

openInfo()用来抓取block信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
//DFSInputStream.java
void openInfo(boolean refreshLocatedBlocks) throws IOException {
final DfsClientConf conf = dfsClient.getConf();
synchronized(infoLock) {
//获取 block的信息
lastBlockBeingWrittenLength =
fetchLocatedBlocksAndGetLastBlockLength(refreshLocatedBlocks);
//获取配置信息,尝试抓取的次数,默认是 3
int retriesForLastBlockLength = conf.getRetryTimesForGetLastBlockLength();
while (retriesForLastBlockLength > 0) {
// Getting last block length as -1 is a special case. When cluster
// restarts, DNs may not report immediately. At this time partial block
// locations will not be available with NN for getting the length. Lets
// retry for 3 times to get the length.
if (lastBlockBeingWrittenLength == -1) {
DFSClient.LOG.warn("Last block locations not available. "
+ "Datanodes might not have reported blocks completely."
+ " Will retry for " + retriesForLastBlockLength + " times");
waitFor(conf.getRetryIntervalForGetLastBlockLength());
lastBlockBeingWrittenLength =
fetchLocatedBlocksAndGetLastBlockLength(true);
} else {
break;
}
retriesForLastBlockLength--;
}
if (lastBlockBeingWrittenLength == -1
&& retriesForLastBlockLength == 0) {
throw new IOException("Could not obtain the last block locations.");
}
}
}

获取block信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
private long fetchLocatedBlocksAndGetLastBlockLength(boolean refresh)
throws IOException {
LocatedBlocks newInfo = locatedBlocks;
//回到 DFSClient 中获取当前 block的信息
if (locatedBlocks == null || refresh) {
newInfo = dfsClient.getLocatedBlocks(src, 0);
}
DFSClient.LOG.debug("newInfo = {}", newInfo);
if (newInfo == null) {
throw new IOException("Cannot open filename " + src);
}

if (locatedBlocks != null) {
Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator();
Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator();
while (oldIter.hasNext() && newIter.hasNext()) {
if (!oldIter.next().getBlock().equals(newIter.next().getBlock())) {
throw new IOException("Blocklist for " + src + " has changed!");
}
}
}
locatedBlocks = newInfo;
long lastBlockBeingWrittenLength = 0;
if (!locatedBlocks.isLastBlockComplete()) {
final LocatedBlock last = locatedBlocks.getLastLocatedBlock();
if (last != null) {
if (last.getLocations().length == 0) {
if (last.getBlockSize() == 0) {
// if the length is zero, then no data has been written to
// datanode. So no need to wait for the locations.
return 0;
}
return -1;
}
final long len = readBlockLength(last);
last.getBlock().setNumBytes(len);
lastBlockBeingWrittenLength = len;
}
}

fileEncryptionInfo = locatedBlocks.getFileEncryptionInfo();
// 返回 block 开始写的位置
return lastBlockBeingWrittenLength;
}

回到DFSClient中:

1
2
3
4
5
6
7
8
@VisibleForTesting
public LocatedBlocks getLocatedBlocks(String src, long start, long length)
throws IOException {
try (TraceScope ignored = newPathTraceScope("getBlockLocations", src)) {
//这里的namenode作为参数传递到callGetBlockLocations()中
return callGetBlockLocations(namenode, src, start, length);
}
}

调用服务端方法,返回block信息:

1
2
3
4
5
6
7
8
9
10
11
12
static LocatedBlocks callGetBlockLocations(ClientProtocol namenode,
String src, long start, long length)
throws IOException {
try {
// 返回 block信息
return namenode.getBlockLocations(src, start, length);
} catch(RemoteException re) {
throw re.unwrapRemoteException(AccessControlException.class,
FileNotFoundException.class,
UnresolvedPathException.class);
}
}

最终将文件block相关信息写入输入流,通过工具类IOUtil输出到本地文件。

文章来自公众号:JS ONE 技术联盟