distcp流程分析 您所在的位置:网站首页 distcp命令参数 distcp流程分析

distcp流程分析

2024-02-02 20:38| 来源: 网络整理| 查看: 265

文章目录 背景使用1. shell1. 文件复制2. 文件夹复制多文件夹复制 2. distcp源码分析准备工作执行metaFolderjob = createJob()map元数据生成(createInputFileListing)

背景

distcp可用于跨集群或集群内目录的复制,distcp参数不同复制的结果差别较大。本文结合官网及源码,对distcp流程进行分析,并结合测试给出验证。

使用 1. shell

目标目录父目录不存在时,可以自动建立多层目标目录。

1. 文件复制

文件复制时,相当于cp,因为hdfs无法并行写。将a.txt复制并重命令为a_bak.txt

$ hadoop distcp \ hdfs://cluster-host1:9000/v2-ec-source/2019/07/02/_SUCCESS \ hdfs://10.179.25.59:9000/v2-ec-source/2019/07/02/_SUCCESS_bak 2. 文件夹复制

distcp主要为复制文件夹服务。一个文件只能分配到一个map。因此,在文件夹中有多个文件时,可以发挥并行优势。

$ hadoop distcp \ hdfs://cluster-host1:9000/v2-ec-source/2019/07/02 \ hdfs://10.179.25.59:9000/v2-ec-source/2019/bak

将02下的所有文件,复制到bak目录下。

多文件夹复制

将多个文件夹复制到目标文件夹下,并且多个文件夹最后一个文件夹名保留。

$ hadoop distcp \ hdfs://cluster-host1:9000/v2-ec-source/2019/06 \ hdfs://cluster-host1:9000/v2-ec-source/2019/07 \ hdfs://10.179.25.59:9000/v2-ec-source/2019/mult

目标文件夹结果:

$ hadoop fs -ls /v2-ec-source/2019/mult ... 2019-10-23 20:44 /v2-ec-source/2019/mult/06 ... 2019-10-23 20:44 /v2-ec-source/2019/mult/07

当从多个源拷贝时,如果两个源冲突,DistCp会停止拷贝并提示出错信息, 如果在目的位置发生冲突,会根据选项设置解决。 默认情况会跳过已经存在的目标文件(比如不用源文件做替换操作)。每次操作结束时都会报告跳过的文件数目,但是如果某些拷贝操作失败了,但在之后的尝试成功了。

值得注意的是,当另一个客户端同时在向源文件写入时,拷贝很有可能会失败。 尝试覆盖HDFS上正在被写入的文件的操作也会失败。 如果一个源文件在拷贝之前被移动或删除了,拷贝失败同时输出异常 FileNotFoundException。

2. distcp源码分析

1.shell入口:

hadoop-common-project/hadoop-common/src/main/bin下以hadoop文件为入口:

... elif [ "$COMMAND" = "distcp" ] ; then CLASS=org.apache.hadoop.tools.DistCp CLASSPATH=${CLASSPATH}:${TOOL_PATH} ... # Always respect HADOOP_OPTS and HADOOP_CLIENT_OPTS HADOOP_OPTS="$HADOOP_OPTS $HADOOP_CLIENT_OPTS" #make sure security appender is turned off HADOOP_OPTS="$HADOOP_OPTS -Dhadoop.security.logger=${HADOOP_SECURITY_LOGGER:-INFO,NullAppender}" export CLASSPATH=$CLASSPATH exec "$JAVA" $JAVA_HEAP_MAX $HADOOP_OPTS $CLASS "$@" ;;

进入 org.apache.hadoop.tools.DistCp: 我们看到Distcp是一个Tool、ToolRunner应用(如果不熟悉hadoop的ToolRunner模式,请参看本博客ToolRunner文章)。Tool应用要求实现run方法,如下:

public class DistCp extends Configured implements Tool { ... // Tool应用必须的run方法 @Override public int run(String[] argv) { // 下文具体分析时给出源码 } // main方法,也是shell的入口 public static void main(String argv[]) { int exitCode; try { DistCp distCp = new DistCp(); Cleanup CLEANUP = new Cleanup(distCp); ShutdownHookManager.get().addShutdownHook(CLEANUP, SHUTDOWN_HOOK_PRIORITY); exitCode = ToolRunner.run(getDefaultConf(), distCp, argv); } catch (Exception e) { LOG.error("Couldn't complete DistCp operation: ", e); exitCode = DistCpConstants.UNKNOWN_ERROR; } System.exit(exitCode); } } 准备工作

我们从main函数入手,先看main的准备工作。

构造器cleanup: private static class Cleanup implements Runnable { private final DistCp distCp; Cleanup(DistCp distCp) { this.distCp = distCp; } @Override public void run() { if (distCp.isSubmitted()) return; distCp.cleanup(); } } ... // 清理方法 private synchronized void cleanup() { try { if (metaFolder == null) return; jobFS.delete(metaFolder, true); metaFolder = null; } catch (IOException e) { LOG.error("Unable to cleanup meta folder: " + metaFolder, e); } }

如果distcp实例未提交任务,则删除metaFolder,并另metaFolder = null。 至于metaFolder作用,下文分析。

ShutdownHookManager工具类(可参看其他文章)。 执行

disctp使用ToolRunner.run执行任务。

exitCode = ToolRunner.run(getDefaultConf(), distCp, argv);

我们回到run方法中。

@Override public int run(String[] argv) { if (argv.length inputOptions = (OptionsParser.parse(argv)); setTargetPathExists(); LOG.info("Input Options: " + inputOptions); } catch (Throwable e) { LOG.error("Invalid arguments: ", e); System.err.println("Invalid arguments: " + e.getMessage()); OptionsParser.usage(); return DistCpConstants.INVALID_ARGUMENT; } try { execute(); } catch (InvalidInputException e) { LOG.error("Invalid input: ", e); return DistCpConstants.INVALID_ARGUMENT; } catch (DuplicateFileException e) { LOG.error("Duplicate files in input path: ", e); return DistCpConstants.DUPLICATE_INPUT; } catch (AclsNotSupportedException e) { LOG.error("ACLs not supported on at least one file system: ", e); return DistCpConstants.ACLS_NOT_SUPPORTED; } catch (XAttrsNotSupportedException e) { LOG.error("XAttrs not supported on at least one file system: ", e); return DistCpConstants.XATTRS_NOT_SUPPORTED; } catch (Exception e) { LOG.error("Exception encountered ", e); return DistCpConstants.UNKNOWN_ERROR; } return DistCpConstants.SUCCESS; } OptionsParser类是distcp单独实现的参数解析工具类。将输入参数解析成DistCpOptions inputOptions类型。如常见的参数overwrite = false等等。作为工具类,暂时忽略。setTargetPathExists():从参数中解析出目标路径。 /** * 为了优化copy,在输入选项中和job配置中,都加入了目标路径 */ private void setTargetPathExists() throws IOException { Path target = inputOptions.getTargetPath(); FileSystem targetFS = target.getFileSystem(getConf()); boolean targetExists = targetFS.exists(target); inputOptions.setTargetPathExists(targetExists); getConf().setBoolean(DistCpConstants.CONF_LABEL_TARGET_PATH_EXISTS, targetExists); } execute():核心执行方法。 前述的多数方法只是对路径等做一些检查以及对execute()方法做异常处理,而execute方法则是任务执行方法。 public Job execute() throws Exception { Job job = createAndSubmitJob(); if (inputOptions.shouldBlock()) { waitForJobCompletion(job); } return job; }

在execute()方法中,会调用createAndSubmitJob()创建MR任务,准备数据,设定数据输入格式,并把任务提交到hadoop集群运行,最后等待任务执行完毕。于是我们可以看到,主体功能实现就在createAndSubmitJob()这个函数体里,工程中其它的各个类无非都是为这个函数接口服务的。下面就是createAndSubmitJob()的代码,这里删除了一些不影响阅读的源码,只留下主体功能流程。

public Job createAndSubmitJob() throws Exception { assert inputOptions != null; assert getConf() != null; Job job = null; try { synchronized(this) { //Don't cleanup while we are setting up. metaFolder = createMetaFolderPath(); jobFS = metaFolder.getFileSystem(getConf()); job = createJob(); } if (inputOptions.shouldUseDiff()) { if (!DistCpSync.sync(inputOptions, getConf())) { inputOptions.disableUsingDiff(); } } createInputFileListing(job); job.submit(); submitted = true; } finally { if (!submitted) { cleanup(); } } String jobID = job.getJobID().toString(); job.getConfiguration().set(DistCpConstants.CONF_LABEL_DISTCP_JOB_ID, jobID); LOG.info("DistCp job-id: " + jobID); return job; } metaFolder

metaFolder是一个Path类型,private Path metaFolder; metafolder是DISTCP工具准备元数据的地方,在createMetaFolderPath()中会结合一个随机数生成一个工作目录,在这个目录中迟点会通过getFileListingPath()生成fileList.seq文件,然后往这个文件中写入数据,这是一个SequenceFile文件,即Key/Value结构的序列化文件,这个文件里将存放所有需要拷贝的源目录/文件信息列表。其中Key是源文件的Text格式的相对路径,即relPath;而Value则记录源文件的FileStatus格式的org.apache.hadoop.fs.FileStatus信息,这里FileStatus是hadoop已经封装好了的描述HDFS文件信息的类,但是DISTCP为了更好的处理数据,重新继承并封装了CopyListingFileStatus类,其描述如下图1,不过我们其实可以认为这里的Value就是FileStatus即可。metafolder目录中的fileList.seq最终会作为参数传递给MR任务中的Mapper。

private Path createMetaFolderPath() throws Exception { Configuration configuration = getConf(); Path stagingDir = JobSubmissionFiles.getStagingDir( new Cluster(configuration), configuration); Path metaFolderPath = new Path(stagingDir, PREFIX + String.valueOf(rand.nextInt())); if (LOG.isDebugEnabled()) LOG.debug("Meta folder location: " + metaFolderPath); configuration.set(DistCpConstants.CONF_LABEL_META_FOLDER, metaFolderPath.toString()); return metaFolderPath; }

开启debug模式后,我们可以看到metaFolder,并能查看内容:

19/12/31 12:08:25 DEBUG tools.DistCp: Meta folder location: /tmp/hadoop-yarn/staging/hadoop/.staging/_distcp714835269 Found 2 items -rw-r--r-- 2 hadoop supergroup 840 2019-11-03 17:48 /tmp/hadoop-yarn/staging/hadoop/.staging/_distcp987712852/fileList.seq -rw-r--r-- 2 hadoop supergroup 740 2019-11-03 17:48 /tmp/hadoop-yarn/staging/hadoop/.staging/_distcp987712852/fileList.seq_sorted job = createJob()

生成常规的MR job,源码如下:

private Job createJob() throws IOException { String jobName = "distcp"; String userChosenName = getConf().get(JobContext.JOB_NAME); if (userChosenName != null) jobName += ": " + userChosenName; Job job = Job.getInstance(getConf()); job.setJobName(jobName); job.setInputFormatClass(DistCpUtils.getStrategy(getConf(), inputOptions)); job.setJarByClass(CopyMapper.class); configureOutputFormat(job); job.setMapperClass(CopyMapper.class); job.setNumReduceTasks(0); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputFormatClass(CopyOutputFormat.class); job.getConfiguration().set(JobContext.MAP_SPECULATIVE, "false"); job.getConfiguration().set(JobContext.NUM_MAPS, String.valueOf(inputOptions.getMaxMaps())); if (inputOptions.getSslConfigurationFile() != null) { setupSSLConfig(job); } inputOptions.appendToConf(job.getConfiguration()); return job; }

可以看到只有map作业,没有reduce。我们先看重点代码:

job.setInputFormatClass(DistCpUtils.getStrategy(getConf(), inputOptions));job.setMapperClass(CopyMapper.class);

我们在设置MapReduce输入格式的时候,会调用上面第一行这样一条语句,这条语句保证了输入文件会按照我们预设的格式被读取。setInputFormatClass里设定了Mapper的数据读取格式,也就是由getStrategy(getConf(), inputOptions)得到,进到这个函数里面,可以看到最终Mapper数据输入格式由UniformSizeInputFormat.class这个类定义的,而这个类继承自InputFormat.class,MR中所有的输入格式类都继承自InputFormat,这是一个抽象类。

public static Class getStrategy(Configuration conf, DistCpOptions options) { String confLabel = "distcp." + StringUtils.toLowerCase(options.getCopyStrategy()) + ".strategy" + ".impl"; return conf.getClass(confLabel, UniformSizeInputFormat.class, InputFormat.class); }

通过名称获取,默认情况confLabel = distcp.uniformsize.strategy.impl,getClass的第二个参数是默认值,即输入格式默认是UniformSizeInputFormat.class。

InputFormat抽象类仅有两个抽象方法:

ListgetSplits(),获取由输入文件计算出输入分片(InputSplit),解决数据或文件分割成片问题。RecordReadercreateRecordReader(),创建RecordReader,从InputSplit中读取数据,解决读取分片中数据问题。

通过InputFormat,Mapreduce框架可以做到:

验证作业输入的正确性;将输入文件切割成逻辑分片(InputSplit),一个InputSplit将会被分配给一个独立的MapTask;提供RecordReader实现,读取InputSplit中的“K-V对”供Mapper使用。

在DISTCP中,UniformSizeInputFormat继承了InputFormat并实现了数据读入格式,它会读取metafolder中fileList.seq序列化文件的内容,并根据用户设定的map数和拷贝总数据量进行分片,计算出分多少片,最终提供“K-V”对供Mapper使用。这个类的源码实现并不复杂,加上注释一共也才100多行,很容易就能读懂。

CopyMapper.class中则定义了每个map的工作逻辑,也就是拷贝的核心逻辑,任务提交到hadoop集群中运行时每个map就是根据这个逻辑进行工作的,通过setMapperClass设定。这里要注意的是DISTCP任务只有map没有reduce,因为只需要map就可以完成拷贝数据的工作。CopyMapper的源码实现在org.apache.hadoop.tools.mapred这个包中,CopyMapper里最核心实现是setup()和map()这两个方法,这两个方法其实也是MR中Mapper的固有通用方法,setup()中完成map方法的一些初始化工作,在DISTCP中,这个方法里会设定对端的目标路径,并做一些参数设置和判断工作,源码(删掉了参数设置部分)如下:

@Override public void setup(Context context) throws IOException, InterruptedException { conf = context.getConfiguration(); syncFolders = conf.getBoolean(DistCpOptionSwitch.SYNC_FOLDERS.getConfigLabel(), false); ignoreFailures = conf.getBoolean(DistCpOptionSwitch.IGNORE_FAILURES.getConfigLabel(), false); skipCrc = conf.getBoolean(DistCpOptionSwitch.SKIP_CRC.getConfigLabel(), false); overWrite = conf.getBoolean(DistCpOptionSwitch.OVERWRITE.getConfigLabel(), false); append = conf.getBoolean(DistCpOptionSwitch.APPEND.getConfigLabel(), false); preserve = DistCpUtils.unpackAttributes(conf.get(DistCpOptionSwitch. PRESERVE_STATUS.getConfigLabel())); targetWorkPath = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH)); Path targetFinalPath = new Path(conf.get( DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH)); targetFS = targetFinalPath.getFileSystem(conf); // 目标路径存在且是一个文件,则覆盖掉 if (targetFS.exists(targetFinalPath) && targetFS.isFile(targetFinalPath)) { overWrite = true; // When target is an existing file, overwrite it. } if (conf.get(DistCpConstants.CONF_LABEL_SSL_CONF) != null) { initializeSSLConf(context); } } map @Override public void map(Text relPath, CopyListingFileStatus sourceFileStatus, Context context) throws IOException, InterruptedException { Path sourcePath = sourceFileStatus.getPath(); if (LOG.isDebugEnabled()) LOG.debug("DistCpMapper::map(): Received " + sourcePath + ", " + relPath); Path target = new Path(targetWorkPath.makeQualified(targetFS.getUri(), targetFS.getWorkingDirectory()) + relPath.toString()); EnumSet fileAttributes = getFileAttributeSettings(context); final boolean preserveRawXattrs = context.getConfiguration().getBoolean( DistCpConstants.CONF_LABEL_PRESERVE_RAWXATTRS, false); final String description = "Copying " + sourcePath + " to " + target; context.setStatus(description); LOG.info(description); try { CopyListingFileStatus sourceCurrStatus; FileSystem sourceFS; try { sourceFS = sourcePath.getFileSystem(conf); final boolean preserveXAttrs = fileAttributes.contains(FileAttribute.XATTR); sourceCurrStatus = DistCpUtils.toCopyListingFileStatus(sourceFS, sourceFS.getFileStatus(sourcePath), fileAttributes.contains(FileAttribute.ACL), preserveXAttrs, preserveRawXattrs); } catch (FileNotFoundException e) { throw new IOException(new RetriableFileCopyCommand.CopyReadException(e)); } FileStatus targetStatus = null; try { targetStatus = targetFS.getFileStatus(target); } catch (FileNotFoundException ignore) { if (LOG.isDebugEnabled()) LOG.debug("Path could not be found: " + target, ignore); } if (targetStatus != null && (targetStatus.isDirectory() != sourceCurrStatus.isDirectory())) { throw new IOException("Can't replace " + target + ". Target is " + getFileType(targetStatus) + ", Source is " + getFileType(sourceCurrStatus)); } if (sourceCurrStatus.isDirectory()) { createTargetDirsWithRetry(description, target, context); return; } FileAction action = checkUpdate(sourceFS, sourceCurrStatus, target); if (action == FileAction.SKIP) { LOG.info("Skipping copy of " + sourceCurrStatus.getPath() + " to " + target); updateSkipCounters(context, sourceCurrStatus); context.write(null, new Text("SKIP: " + sourceCurrStatus.getPath())); } else { copyFileWithRetry(description, sourceCurrStatus, target, context, action, fileAttributes); } DistCpUtils.preserve(target.getFileSystem(conf), target, sourceCurrStatus, fileAttributes, preserveRawXattrs); } catch (IOException exception) { handleFailures(exception, sourceFileStatus, target, context); } }

从输入参数可以看出来,这其实就是对上面分析过的UniformSizeInputFormat类里分片后的数据里的每一行进行处理,每行里存放的就是“K-V”对,也就是fileList.seq文件每行的内容。Map方法体前半部分有一大堆代码内容,其实都是一些准备和判断工作,为后面的拷贝服务,最终的拷贝动作发生在copyFileWithRetry(description,sourceCurrStatus, target, context, action, fileAttributes)这个函数中,进入这个函数一直往里面读,就能看到数据最终通过常用的Java输入输出流的方式完成点对点拷贝,最后拷贝方法源码如下:

private void copyFileWithRetry(String description, FileStatus sourceFileStatus, Path target, Context context, FileAction action, EnumSet fileAttributes) throws IOException { long bytesCopied; try { bytesCopied = (Long) new RetriableFileCopyCommand(skipCrc, description, action).execute(sourceFileStatus, target, context, fileAttributes); } catch (Exception e) { context.setStatus("Copy Failure: " + sourceFileStatus.getPath()); throw new IOException("File copy failed: " + sourceFileStatus.getPath() + " --> " + target, e); } incrementCounter(context, Counter.BYTESEXPECTED, sourceFileStatus.getLen()); incrementCounter(context, Counter.BYTESCOPIED, bytesCopied); incrementCounter(context, Counter.COPY, 1); }

最后调用copyToFile,copyBytes(sourceFileStatus, sourceOffset, outStream, BUFFER_SIZE, context);方法。

元数据生成(createInputFileListing)

前面提到在metafolder目录中会生成fileList.seq文件,而这个文件是怎么生成以及文件里面保存些什么内容呢?这个逻辑就在createInputFileListing(job)中完成的。 查看 UniformSizeInputFormat 类中对 createRecordReader 的实现。

@Override public RecordReader createRecordReader( InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { return new SequenceFileRecordReader(); }

回到createAndSubmitJob方法中,在job提交以前,createInputFileListing(job)。首先由getFileListingPath()方法创建一个空的seq文件,然后由buildListing()方法往这个seq文件写入数据,数据写入的整体逻辑就是遍历源路径下所有目录和文件,把每个文件的相对路径和它的CopyListingFileStatus以“K-V”对的形式写入fileList.seq每行中,最终就得到Mapper的输入了。

protected Path createInputFileListing(Job job) throws IOException { Path fileListingPath = getFileListingPath(); CopyListing copyListing = CopyListing.getCopyListing(job.getConfiguration(), job.getCredentials(), inputOptions); copyListing.buildListing(fileListingPath, inputOptions); return fileListingPath; }

job提交以后,清理metaFolder,结束。

参考文献:

https://blog.csdn.net/github_34457546/article/details/69563629


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有