Spark SQL 查询 Parquet 文件的性能提升 30%,字节是如何做到的? 您所在的位置:网站首页 sparksql执行sql文件 Spark SQL 查询 Parquet 文件的性能提升 30%,字节是如何做到的?

Spark SQL 查询 Parquet 文件的性能提升 30%,字节是如何做到的?

2023-11-28 12:33| 来源: 网络整理| 查看: 265

本文来自11月举办的 Data + AI Summit 2020 (原 Spark+AI Summit),主题为《Improving Spark SQL Performance by 30%: How We Optimize Parquet Filter Pushdown and Parquet Reader》的分享,作者为字节跳动的孙科和郭俊。

Parquet 是一种非常流行的列式存储格式。Spark 的算子下推(pushdown filters)可以利用 Parquet 文件的统计数据(比如最大最小值统计)来过滤无用数据。另一方面,Spark 用户可以启用 Spark Parquet 的向量化读取器(vectorized reader)来批量读取 parquet 文件。这些特性大大提高了 Spark 的性能,节省了 CPU 和 IO。Parquet 是字节跳动数据仓库的默认数据格式。在实践中,字节团队的同学发现 parquet 的下推过滤器的工作效果很差,其读取了大量没用的统计数据,这些统计数据对过滤 Parquet 的 row groups 无效(当 ETL 作业写入 Parquet 文件时列数据是无序的)。

在过去一年时间里,自己在 Spark 中添加了一系列的优化措施来提升 Parquet 的下推性能。我们开发了一个名为 LocalSort 的特性,在写 Parquet 文件时通过对一些列添加一个排序步骤,从而可以利用这些统计数据明显区分 Parquet Row groups,并提高压缩比(根据历史查询自动进行,不需要修改ETL作业)。此外,我们开发了一个名为 Prewhere 的特性。Prewhere parquet reader 从下推过滤器中选择低开销的列,以批处理方式来读取这些列的数据,并使用下推过滤器过滤数据,同时跳过其他不需要的列。这些努力的直接结果是,我们实现了平均 30% 的查询改进,40%的存储改进,而开销只有5%。

这篇文章将深入介绍 LocalSort 和 Prewhere,同时介绍 LocalSort/Prewhere 都有哪些用户场景,最后也会介绍一些基于历史查询自动建议对列进行排序的相关工作。

下面是本文的视频,相关 PPT 可以关注 Java与大数据架构  公众号并回复  9912 获取。

本次分享的主题就上面三个:

•Spark SQL 在字节跳动的使用;

•Spark 是如何读取 Parquet 文件的;•字节跳动是优化 Parquet Filter Pushdown 和 Parquet Reader 的

字节跳动的 Spark SQL 使用情况

这部分内容其实在上一篇文章有提过 《物化列:字节为解决 Spark 嵌套列查询性能低下的优化》。在字节超过 98% 的 ETL 作业是用 Spark SQL 进行的。Parquet 是数据仓库的默认文件格式,Parquet 向量化读取默认也是启用的通过 spark.sql.parquet.enableVectorizedReader 参数启用。

Spark 是如何读取 Parquet 文件的

首先我们来回滚一下 Parquet 文件的格式。下面是从 Parquet 官方网站拷贝过来的 Parquet 文件格式。

从上图可以看出 Parquet 格式的文件包含多个 Row Group 和一个 Footer。Parquet 文件里面包含了许多有用的信息,比如统计信息(Statistics),Spark 和 Parquet 可以使用 Footer 和 Row Group 里面的统计信息来进行过滤下推。比如某个 Row Group 的 id 列最大值为 10,当我们查询 id > 20 的时候,就可以利用统计信息过滤掉这个 Row Group。

由于 Parquet 格式的特点,所以在 Parquet 中做列裁减(column pruning)是非常容易实现的;同时 Parquet 里面对数据的压缩也是非常容易做的。

接下来我们来看下 Spark SQL 中读取 Parquet 文件的过程。

整个过程的入口点是 DataSourceScanExecution,然后是 ParquetFileFormat,最后是 VectorizedParquetReactorReader。VectorizedParquetReactorReader 是读取 Parquet 文件非常重要的类,VectorizedParquetReactorReader 可以通过把算子下推转换成 ParquetFilters 来过滤掉一些无用的 Row Group。

同时,这个类为每个目标列构建 column Readers,而这些 column Readers 是一起以批量的形式读取数据的。比如我们想读取三列的数据,VectorizedParquetReactorReader 会为我们构造三个 column Readers。

假设我们的查询 SQL 为 select * fromtable_name where date = '***' and category = 'test'。在这个例子中,date 这列是分区列,而 category是 predicate column。

上图中表格里面假设是 Parquet 文件里面每个 Row Group 分布情况。由于 RowGroup1 中 category 的最小值为 a1,最大值为 z1,所以在执行上面的 SQL 查询时需要把这个 Row Group 的数据读出来,因为 category = 'test' 包含在其中。同理,RowGroup2、RowGroup3 也需要读出来。所以在这种情况下 Spark 需要把 Parquet 文件中的三个 Row Group 都读出来,因为这三个 Row Group 的统计信息是过滤不了数据的。

在一些生产环境下,Parquet filter pushdown 不能很好的工作,因为 Parquet 中的 predicate columns 是乱序的。所以如果对一些比较常用的过滤列进行排序是非常有用的,以此来减少数据读取的 IO。

下面我们来看下另外一个例子。假设我们的查询 SQL 为 select col1 fromtable_name where date = ‘***’ and col2 = ‘test’。

在这个例子中,RowGroup1 是被过滤下推过滤了;col3 是被列裁剪过滤了。所以在向量化读取的时候 col1 和 col2 是被读取的。在很多情况下,如果 col2=‘test’ 过滤的数据非常高,那么 col1 的大部分数据都是不必要读取的。所以在查询时首先通过 filter column(col2=‘test’)来读取和过滤数据,然后再读取其他列是非常值得的做法。

字节跳动是优化 Parquet Filter Pushdown 和 Parquet Reader 的

首先字节的同学发现 Parquet 文件中 RowGroup 的统计信息是不能简单过滤数据的,所以他们希望提高 Parquet 统计信息的准确性。对于用户而言,他们的目标也是很明确的:低负载;所以过滤整个数据是非常昂贵的,甚至是不可能的。所以只能对某些列进行排序,但是新增的排序不能要求用户对现有的  ETL 做任何修改。

基于上面的原因,字节开发出 LocalSort 功能,也就是在 InsertIntoHiveTable 节点之前添加一个 SortExec 节点。通过这个特性,Spark 可以对 Parquet 中的某些列进行排序,以此来提高 Parquet 文件中统计信息的准确性。

上图是添加了 LocalSort 功能之后 Spark 的执行计划对比,可以看到,InsertIntoHiveTable 之前添加了 SortExec 节点。

那么问题来了,哪些列应该排序?字节的策略是分析历史的查询,选择那些在过滤条件中经常使用的列进行排序。同时也可以通过在表的属性里配置排序列,Spark SQL 会自动读取这些属性。所有的排序过程都是自动的,不需要用户做任何改变,从而满足用户的需求。

有了 LocalSort 之后,RowGroup 的信息就是准确的。比如上面 RowGroup1 中 category 最小值为 a1;最大值为 g1。根据上面的准确统计信息,在执行 select col1 fromtable_name where date = ‘***’ and col2 = ‘test’ 查询时,只需要读取 RowGroup2。这就使得我们可以读取更少的数据,由于相同的数据存放在一起,所以进一步导致 Parquet 文件的存储大小也变小了。而这些好处仅仅需要付出 5% 的代价,所以还是很值得的。

上面就是 Parquet filter pushdown 的优化细节。下面我们来看下 Parquet Reader 相关的优化。

我们发现 Spark 读取了很多无用的数据,所以想尽可能的读取上一点的数据。为了解决这个问题,字节的同学开发了名为 Prewhere 的功能,这个思想来自 ClickHouse。在 Prewhere 中,首先批量读取过滤的列,如果过滤的列没有匹配,那么其他列直接跳过。那么字节他们是如何实现的呢?

字节团队他们把 Parquet reader 拆分成两个 Reader:FilterReader 和 NonFilterReader。FilterReader 用于过滤列;而 NonFilterReader 用于其他列的读取。

整个过程如下:首先使用 FilterReader 来批量读取数据,然后对读取的数据使用过滤条件,如果这些数据没有符合过滤条件,则继续使用 FilterReader 读取数据。如果有数据符合过滤条件,则使用 NonFilterReader 批量的读取需要的数据同时跳过那些无用的数据,最后读取出来的数据会和 FilterReader 读取的数据进行 union 操作。

经过这个操作,一些场景下可以直接跳过 Parquet page,甚至是直接跳过读取 Parquet RowGroup。

在字节,生产环境下支持的过滤列类型有 ByteType、ShortType、IntegerType、 LongType、FloatType、 DoubleType 以及 StringType。支持的类型有 > 、>=、



【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有