Spark Hive 小文件合并 | 您所在的位置:网站首页 › hive表分区的作用 › Spark Hive 小文件合并 |
背景
小文件带来的问题
对于HDFS
从 NN RPC请求角度,文件数越多,读写文件时,对于NN的RPC请求就越多,增大NN压力。 从 NN 元数据存储角度,文件数越多,NN存储的元数据就越大。 对于下游流程下游流程,不论是MR、Hive还是Spark,在划分分片(getSplits)的时候,都要从NN获取文件信息。这个过程的耗时与文件数成正比,同时受NN压力的影响。在NN压力大,上游小文件多的情况下,下游的getSplits操作就会比较慢。 作业生成的文件数为了简化问题,假设: 不考虑一个task写出文件大小的限制,那么一个task对于一个分区(一个目录)只写出一个文件 没有数据的task不会写出文件 在MR、Spark中,设写出到HDFS的stage中task的个数为T 如果结果表没有分区,或者写出静态分区,则每个Task写出一个文件,那么最多会写出T个文件。 如果结果表有动态分区,不同的分区是写到不同的目录下,令第i个动态分区dp的基数(cardinality)为card(dpi),那么如果k个动态分区,最多写出的文件数为card(dp1) * card(dp2) * ...* card(dpk ) * T。 如何减少作业生成的文件数所以,控制最终输出的文件个数,可以从以下3个角度入手: 控制最终stage的task个数,也就是控制整个作业的并行度,具体来讲,可以从最开始单个map输入size,shuffle之后单个reduce的size两方面来控制。 在写入HDFS之后,计算平均文件大小,merge小文件(但是这种做法只能缓解NN元数据的压力,由于存在写小文件,统计平均文件大小,读小文件、写出大文件这一连串操作,会增加NN RPC的压力,在NN负载高的时候,还会增加作业本身的执行时间)。 控制最终stage的输入数据划分,让同一个分区的数据,尽量在一个task内。 Map端输入合并 Hadoop InputFormat关系InputFormat FileInputFormat TextInputFormat DeprecatedLzoTextInputFormat OrcInputFormat KeyValueTextInputFormat SequenceFileInputFormat CombineFileInputFormat HiveInputFormat ConbineHiveInputFormat 接口 具体实现 抽象类 桥接类 Spark getSplits路径HadoopRDD -> wrapSplits -> InputFormat.getSplits 切分文件,得到原始splits,通过InputFormat调用具体的序列化实现,来后去splits 合并splits,根据1的结果,再结合spark.hadoopRDD.targetBytesInPartition参数的值,将splits合并 生成HadoopRDD Hive getSplits路径CombineHiveInputFormat(extends HiveInputFormat) -> CombineFileInputFormat -> FileInputFormat(InputFormat) 切分文件,得到原始splits,FileInputFormat,调用InputFormat获取splits 合并splits,CombineFileInputFormat,根据具体的InputFormat的实现划分的split结果,合并splits CombineHiveInputFormat 根据输入的目录(可能多个),分别找到序列化方式,分别调用getSplits 每个类的作用 FileInputFormat作用 可以将一个大的文件划分为一个或者多个split,但是不能合并, 原理 getSplits如何划分文件 在对一个文件,进行split切分的时候,computeSplitSize这个函数负责计算一个split的大小。 代码块 Java protected long computeSplitSize(long goalSize, long minSize, long blockSize) { return Math.max(minSize, Math.min(goalSize, blockSize)); }对于特定的文件,totalSize是文件大小,numSplits是一个文件期望的分片数,默认是1,所有goalSize默认值也就等于一个文件的大小。 代码块 Java goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);minSize 由参数 mapreduce.input.fileinputformat.split.minsize确定,默认值是1。 在默认情况下,最终的splitSize为: splitSize = Math.max(minSize, Math.min(goalSize, blockSize)) = Math.max(1, Math.min(fileSize, blockSize)) = Math.min(fileSize, blockSize) 因此,在默认情况下,一个split size为文件大小与blockSize的较小的一个,含义是:一个文件如果小于一个block就不分,如果大于一个block(可能分布在不同dn),就可以分割,看起来也是比较合理的。 但是如果设置了期望的分片数numSplits、最小的分片大小minSize,并且minSize > min(goalSize, blockSize),minSize就会起作用,含义就是根据用户设置的最小分片大小minSize将文件切成多个splits。 新旧API的一点小差异 以上分析的是org.apache.hadoop.mapred.FileInputFormat里面的实现,DeprecatedLzoTextInputFormat就是依赖上面的实现来划分split 但是org.apache.hadoop.mapred这个包已经标记为deprecated,取而代之的是org.apache.hadoop.mapreduce下面的实现 对应的FileInputFormat为org.apache.hadoop.mapreduce.lib.input.FileInputFormat org.apache.hadoop.mapreduce.lib.input.FileInputFormat与org.apache.hadoop.mapred.FileInputFormat划分split有一些变化,computeSplitSize的时候,公式为: 代码块 SQL Math.max(minSize, Math.min(maxSize, blockSize))minSize与blockSize不变,只是把goalSize换成了mapreduce.input.fileinputformat.split.maxsize(默认256M)。 CombineFileInputFormat作用 将小的split合并成大的split 参数 参数 含义 默认值 mapreduce.input.fileinputformat.split.maxsize 一个split的最大size 256000000 (256M) mapred.min.split.size.per.node 一个node上的一个split的最小size 1 mapred.min.split.size.per.rack 一个rack上的一个split的最小size 1 这么多参数,如果我要在Hive中合并小文件,比如以256M为size合并,应该如何调参? 如果要想在map输入端以256M的size合并小文件,那么就把上面3个参数设置成256M就可以。 同时为了不必要的文件切分,把mapreduce.input.fileinputformat.split.minsize 也设置成256M。 CombineHiveInputFormat在一个MR job中,可以读取不同InputFormat方式序列化的分区目录 Map端合并参数总结如果想要在Map端按照大小S来合并文件,如何设置参数? 引擎 参数 值 Spark spark.hadoopRDD.targetBytesInPartition spark.hadoop.mapreduce.input.fileinputformat.split.maxsize spark.hadoop.mapreduce.input.fileinputformat.split.minsize S Hive mapreduce.input.fileinputformat.split.maxsize mapreduce.input.fileinputformat.split.minsize mapred.min.split.size.per.node mapred.min.split.size.per.rack S 以上分析适用于FileInputFormat(文本格式),对于ORC文件,还需要将hive.exec.orc.split.strategy 设置为ETL,原理可以参考Spark SQL参数调优指南 引擎 参数 值 Spark spark.hadoopRDD.targetBytesInPartition spark.hadoop.mapreduce.input.fileinputformat.split.maxsize spark.hadoop.mapreduce.input.fileinputformat.split.minsize S spark.hadoop.hive.exec.orc.split.strategy ETL Hive mapreduce.input.fileinputformat.split.maxsize mapreduce.input.fileinputformat.split.minsize mapred.min.split.size.per.node mapred.min.split.size.per.rack S hive.exec.orc.split.strategy ETL Shuffle之后合并无论是Spark还是Hive,在shuffle之后的合并都比较类似,都是根据上游的map的结果size,将多个map的结果合并给下游的一个reducer,具体的参数如下表: 功能 引擎 参数 值 确定下游合并的size Spark spark.sql.adaptive.shuffle.targetPostShuffleInputSize 134217728 (128M) Hive hive.exec.reducers.bytes.per.reducer 1G 确定最大reducer个数 Spark spark.sql.shuffle.partitions 2000 Hive hive.exec.reducers.max 1009? 写入HDFS之后合并原理都是统计目录下的平均文件大小,如果小于某个阈值,就再启动一个map job,来合并文件 Hive相关参数 参数 含义 值 hive.merge.mapfiles Merge small files at the end of a map-only job 合并map-only的小文件 true hive.merge.mapredfiles Merge small files at the end of a map-reduce job. 合并map-reduce的小文件 false hive.merge.size.per.task Size of merged files at the end of the job. 小文件合并之时候,期望一个map的输入大小 256M hive.merge.smallfiles.avgsize When the average output file size of a job is less than this number, Hive will start an additional map-reduce job to merge the output files into bigger files. This is only done for map-only jobs if hive.merge.mapfiles is true, and for map-reduce jobs if hive.merge.mapredfiles is true. 小文件合并的阈值 16M 在决定一个目录是否需要合并小文件的时候,会统计目录下的平均大小,然后与hive.merge.smallfiles.avgsize 比较 hive.merge.size.per.task 参数的作用,就是在合并小文件的job中,将mapreduce.input.fileinputformat.split.maxsize 、mapreduce.input.fileinputformat.split.minsize 、mapred.min.split.size.per.node、mapred.min.split.size.per.rack 这4个参数的设置成 hive.merge.size.per.task,最终通过 CombineFileInputFormat 来实现文件合并 在合并文件的时候,如何决定一个map task读多少数据? max(hive.merge.size.per.task, hive.merge.smallfiles.avgsize) Spark 参数spark.sql.mergeSmallFileSize 与 hive.merge.smallfiles.avgsize 类似 spark.sql.targetBytesInPartitionWhenMerge 与hive.merge.size.per.task 类似 策略在决定一个目录是否需要合并小文件的时候,会统计目录下的平均大小,然后与spark.sql.mergeSmallFileSize 比较 在合并文件的时候,如何决定一个map task读多少数据 max(spark.sql.mergeSmallFileSize, spark.sql.targetBytesInPartitionWhenMerge , spark.hadoopRDD.targetBytesInPartition ) Spark为什么有2个参数决定map端的输入有时候在作业的最开始的输入,不需要合并小文件,但是作业写出到目标表之后,需要合并文件,所以需要有两个参数把这两种情况加以区分。 spark.hadoopRDD.targetBytesInPartition,来设置最开始的输入输入端的map合并文件大小。 spark.sql.targetBytesInPartitionWhenMerge 与hive.merge.size.per.task类似,是设置额外的合并job的map端输入size。 Spark与Hive参数对比作用 引擎 参数 触发小文件合并的阈值 Spark spark.sql.mergeSmallFileSize Hive hive.merge.smallfiles.avgsize 合并小文件时候,map的输入size控制 Spark spark.sql.targetBytesInPartitionWhenMerge spark.hadoopRDD.targetBytesInPartition Hive hive.merge.size.per.task 合并小文件时候,实际的map输入size确定 Spark max(spark.sql.mergeSmallFileSize, spark.sql.targetBytesInPartitionWhenMerge , spark.hadoopRDD.targetBytesInPartition ) Hive max(hive.merge.size.per.task, hive.merge.smallfiles.avgsize) 不同分区的数据分布对于多级动态分区的情况,在写入结果表之前,最好能够根据分区进行distribute by一下,按照分区重新shuffle,让同一分区的数据,集中在一个task里面。比如有三个动态分区dp1, dp2, dp3,就distribute by dp1, dp2, dp3 参考http://flyingdutchman.iteye.com/blog/1876400 http://blog.javachen.com/2013/09/04/how-to-decide-map-number.html https://www.cnblogs.com/yurunmiao/p/4282497.html https://blog.csdn.net/wawmg/article/details/17095125 |
CopyRight 2018-2019 实验室设备网 版权所有 |