文件上传源码分析 先上文件上传的方法调用过程时序图
其主要执行过程:
1、FileSystem初始化,Client拿到NameNodeRpcServer代理对象,建立与NameNode的RPC通信
2、调用FileSystem的create()方法,由于实现类为DistributedFileSystem,所以是调用该类中的create()方法
3、DistributedFileSystem持有DFSClient的引用,继续调用DFSClient中的create()方法
4、DFSOutputStream提供的静态newStreamForCreate()方法中调用NameNodeRpcServer服务端的create()方法并创建DFSOutputStream输出流对象返回
5、通过hadoop提供的IOUtil工具类将输出流输出到本地
下面我们来看下源码:
首先初始化文件系统,建立与服务端的RPC通信:
调用FileSystem的create()方法,由于FileSystem是一个抽象类,这里实际上是调用的该类的子类create()方法
1 2 3 4 5 6 7 8 public abstract FSDataOutputStream create ( Path var1, FsPermission var2, boolean var3, int var4, short var5, long var6, Progressable var8) throws IOException;
前面我们已经说过FileSystem.get()返回的是DistributedFileSystem对象,所以这里我们直接进入DistributedFileSystem:
1 2 3 4 5 6 7 8 FileSystem fs=FileSystem.get(new URI ("hdfs://hadoop01:9000" ), conf, "hadoop" ); FileSystem fileSystem = FileSystem.get(conf);System.out.println(fileSystem); System.out.println(fs instanceof DistributedFileSystem);
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 @Override public FSDataOutputStream create (final Path f, final FsPermission permission, final EnumSet<CreateFlag> cflags, final int bufferSize, final short replication, final long blockSize, final Progressable progress, final ChecksumOpt checksumOpt) throws IOException { statistics.incrementWriteOps(1 ); storageStatistics.incrementOpCounter(OpType.CREATE); Path absF = fixRelativePart(f); return new FileSystemLinkResolver <FSDataOutputStream>() { @Override public FSDataOutputStream doCall (final Path p) throws IOException { final DFSOutputStream dfsos = dfs.create(getPathName(p), permission, cflags, replication, blockSize, progress, bufferSize, checksumOpt); return dfs.createWrappedOutputStream(dfsos, statistics); } @Override public FSDataOutputStream next (final FileSystem fs, final Path p) throws IOException { return fs.create(p, permission, cflags, bufferSize, replication, blockSize, progress, checksumOpt); } }.resolve(this , absF); }
DFSClient的create()返回一个DFSOutputStream对象:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public DFSOutputStream create (String src, FsPermission permission, EnumSet<CreateFlag> flag, boolean createParent, short replication, long blockSize, Progressable progress, int buffersize, ChecksumOpt checksumOpt, InetSocketAddress[] favoredNodes, String ecPolicyName) throws IOException { checkOpen(); final FsPermission masked = applyUMask(permission); LOG.debug("{}: masked={}" , src, masked); final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this , src, masked, flag, createParent, replication, blockSize, progress, dfsClientConf.createChecksum(checksumOpt), getFavoredNodesStr(favoredNodes), ecPolicyName); beginFileLease(result.getFileId(), result); return result; }
我们继续看下newStreamForCreate()中的业务逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 tatic DFSOutputStream newStreamForCreate (DFSClient dfsClient, String src, FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent, short replication, long blockSize, Progressable progress, DataChecksum checksum, String[] favoredNodes, String ecPolicyName) throws IOException { try (TraceScope ignored = dfsClient.newPathTraceScope("newStreamForCreate" , src)) { HdfsFileStatus stat = null ; boolean shouldRetry = true ; int retryCount = CREATE_RETRY_COUNT; while (shouldRetry) { shouldRetry = false ; try { stat = dfsClient.namenode.create(src, masked, dfsClient.clientName, new EnumSetWritable <>(flag), createParent, replication, blockSize, SUPPORTED_CRYPTO_VERSIONS, ecPolicyName); break ; } catch (RemoteException re) { IOException e = re.unwrapRemoteException( AccessControlException.class, DSQuotaExceededException.class, QuotaByStorageTypeExceededException.class, FileAlreadyExistsException.class, FileNotFoundException.class, ParentNotDirectoryException.class, NSQuotaExceededException.class, RetryStartFileException.class, SafeModeException.class, UnresolvedPathException.class, SnapshotAccessControlException.class, UnknownCryptoProtocolVersionException.class); if (e instanceof RetryStartFileException) { if (retryCount > 0 ) { shouldRetry = true ; retryCount--; } else { throw new IOException ("Too many retries because of encryption" + " zone operations" , e); } } else { throw e; } } } Preconditions.checkNotNull(stat, "HdfsFileStatus should not be null!" ); final DFSOutputStream out; if (stat.getErasureCodingPolicy() != null ) { out = new DFSStripedOutputStream (dfsClient, src, stat, flag, progress, checksum, favoredNodes); } else { out = new DFSOutputStream (dfsClient, src, stat, flag, progress, checksum, favoredNodes, true ); } out.start(); return out; } }
到此,Client拿到了服务端的输出流对象,那么后面就容易了,都是一些简答的文件输出,输入流的操作(hadoop提供的IOUitl)。
文件下载源码分析 文件上传的大致流程与文件下载类似,与上传一样,我们先上程序方法调用时序图:
主要执行过程:
FileSystem初始化,Client拿到NameNodeRpcServer代理对象,建立与NameNode的RPC通信(与前面一样)
调用FileSystem的open()方法,由于实现类为DistributedFileSystem,所有是调用该类中的open()方法
DistributedFileSystem持有DFSClient的引用,继续调用DFSClient中的open()方法
实例化DFSInputStream输入流
调用openinfo()方法
调用fetchLocatedBlocksAndGetLastBlockLength()方法,抓取block信息并获取最后block长度
调用DFSClient中的getLocatedBlocks()方法,获取block信息
在callGetBlockLocations()方法中通过NameNode代理对象调用NameNodeRpcServer的getBlockLocations()方法
将block信息写入输出流
交给IOUtil,下载文件到本地
接下来,我们开始看源码:
首先任然是FileSystem的初始化,前面有,这里就不贴出来了,我们直接从DistributedFileSystem的open()开始看。
1 2 3 4 5 6 7 8 9 10 11 12 13 @Override public FSDataInputStream open (PathHandle fd, int bufferSize) throws IOException { statistics.incrementReadOps(1 ); storageStatistics.incrementOpCounter(OpType.OPEN); if (!(fd instanceof HdfsPathHandle)) { fd = new HdfsPathHandle (fd.bytes()); } HdfsPathHandle id = (HdfsPathHandle) fd; final DFSInputStream dfsis = dfs.open(id, bufferSize, verifyChecksum); return dfs.createWrappedInputStream(dfsis); }
DFSClient中并没有直接使用NameNode的代理对象,而是传给了DFSInputStream:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 public DFSInputStream open (String src, int buffersize, boolean verifyChecksum) throws IOException { checkOpen(); try (TraceScope ignored = newPathTraceScope("newDFSInputStream" , src)) { LocatedBlocks locatedBlocks = getLocatedBlocks(src, 0 ); return openInternal(locatedBlocks, src, verifyChecksum); } } private DFSInputStream openInternal (LocatedBlocks locatedBlocks, String src, boolean verifyChecksum) throws IOException { if (locatedBlocks != null ) { ErasureCodingPolicy ecPolicy = locatedBlocks.getErasureCodingPolicy(); if (ecPolicy != null ) { return new DFSStripedInputStream (this , src, verifyChecksum, ecPolicy, locatedBlocks); } return new DFSInputStream (this , src, verifyChecksum, locatedBlocks); } else { throw new IOException ("Cannot open filename " + src); } }
那么在DFSInputStream必须持有DFSClient的引用:
1 2 3 4 5 6 7 8 9 10 11 12 DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum, LocatedBlocks locatedBlocks) throws IOException { this .dfsClient = dfsClient; this .verifyChecksum = verifyChecksum; this .src = src; synchronized (infoLock) { this .cachingStrategy = dfsClient.getDefaultReadCachingStrategy(); } this .locatedBlocks = locatedBlocks; openInfo(false ); }
openInfo()用来抓取block信息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 void openInfo (boolean refreshLocatedBlocks) throws IOException { final DfsClientConf conf = dfsClient.getConf(); synchronized (infoLock) { lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength(refreshLocatedBlocks); int retriesForLastBlockLength = conf.getRetryTimesForGetLastBlockLength(); while (retriesForLastBlockLength > 0 ) { if (lastBlockBeingWrittenLength == -1 ) { DFSClient.LOG.warn("Last block locations not available. " + "Datanodes might not have reported blocks completely." + " Will retry for " + retriesForLastBlockLength + " times" ); waitFor(conf.getRetryIntervalForGetLastBlockLength()); lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength(true ); } else { break ; } retriesForLastBlockLength--; } if (lastBlockBeingWrittenLength == -1 && retriesForLastBlockLength == 0 ) { throw new IOException ("Could not obtain the last block locations." ); } } }
获取block信息:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 private long fetchLocatedBlocksAndGetLastBlockLength (boolean refresh) throws IOException { LocatedBlocks newInfo = locatedBlocks; if (locatedBlocks == null || refresh) { newInfo = dfsClient.getLocatedBlocks(src, 0 ); } DFSClient.LOG.debug("newInfo = {}" , newInfo); if (newInfo == null ) { throw new IOException ("Cannot open filename " + src); } if (locatedBlocks != null ) { Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator(); Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator(); while (oldIter.hasNext() && newIter.hasNext()) { if (!oldIter.next().getBlock().equals(newIter.next().getBlock())) { throw new IOException ("Blocklist for " + src + " has changed!" ); } } } locatedBlocks = newInfo; long lastBlockBeingWrittenLength = 0 ; if (!locatedBlocks.isLastBlockComplete()) { final LocatedBlock last = locatedBlocks.getLastLocatedBlock(); if (last != null ) { if (last.getLocations().length == 0 ) { if (last.getBlockSize() == 0 ) { return 0 ; } return -1 ; } final long len = readBlockLength(last); last.getBlock().setNumBytes(len); lastBlockBeingWrittenLength = len; } } fileEncryptionInfo = locatedBlocks.getFileEncryptionInfo(); return lastBlockBeingWrittenLength; }
回到DFSClient中:
1 2 3 4 5 6 7 8 @VisibleForTesting public LocatedBlocks getLocatedBlocks (String src, long start, long length) throws IOException { try (TraceScope ignored = newPathTraceScope("getBlockLocations" , src)) { return callGetBlockLocations(namenode, src, start, length); } }
调用服务端方法,返回block信息:
1 2 3 4 5 6 7 8 9 10 11 12 static LocatedBlocks callGetBlockLocations (ClientProtocol namenode, String src, long start, long length) throws IOException { try { return namenode.getBlockLocations(src, start, length); } catch (RemoteException re) { throw re.unwrapRemoteException(AccessControlException.class, FileNotFoundException.class, UnresolvedPathException.class); } }
最终将文件block相关信息写入输入流,通过工具类IOUtil输出到本地文件。
文章来自公众号:JS ONE 技术联盟