EMR Spark Relational Cache 利用数据预组织加速查询

简介: 本文介绍了EMR Spark的Relational Cache如何从数据量较大的Cube中快速提取出所需数据加速查询的原理。通过列式存储、文件索引、Z-Order等技术,我们可以快速过滤数据,大大减少实际发生的IO数据量,避免IO瓶颈的出现,从而优化整体查询性能。

作者:
王道远,花名健身,阿里云EMR技术专家,Apache Spark活跃贡献者,主要关注大数据计算优化相关工作。


Relational Cache相关文章链接:
使用Relational Cache加速EMR Spark数据分析
使用EMR Spark Relational Cache跨集群同步数据
EMR Spark Relational Cache的执行计划重写
EMR Spark Relational Cache如何支持雪花模型中的关联匹配

背景

在利用Relational Cache进行查询优化时,我们需要通过预计算,存储大量数据。而在查询时,我们真正需要读取的数据量也许并不大。为了能让查询实现秒级响应,这就涉及到优化从大量数据中快速定位所需数据的场景。本文介绍在EMR Spark Relational Cache中,我们如何针对这种场景进行了优化。

存储格式

在数据存储格式上,我们默认选择Spark社区支持最好的Parquet格式。Parquet是一种列式存储格式,我们可以很方便地利用列式存储格式进行字段裁剪。另外,Parquet的每个数据文件由多个Row Group组成,同时在每个数据文件的footer中记录了各个Row Group的统计信息,如最大值、最小值等。这些统计信息可以在读取数据时减少实际的IO开销。事实上,在现在的Spark版本中,我们可以看到Catalyst优化器已经把可以下推的一些过滤条件下推到了Parquet reader,利用Parquet文件的统计信息过滤真正需要读取的Row Group,从而实现减少IO量,加速查询时间的效果。这也是列存格式基本都支持的功能。

对于Relational Cache而言,有很多过滤条件时确定已知的。我们直接利用这一特性,将确定的查询条件下推到Parquet reader里,由Parquet reader完成对Row Group的选择。由于实际要读取的数据量占总数据量的比重往往很小,这种过滤的实际效果还是比较好的。

image

数据分区

对于Spark Relational Cache来说,由于构建Cube时会使用到Expand算子,我们需要引入Grouping ID来区分不同的grouping set。在大部分后续的查询中,我们往往只需要其中一个Grouping ID所对应的数据。因此,Grouping ID成了一个天然的数据分区选择。在Hive/Spark等大数据分析引擎中,数据分区是对于结构化数据,将其中一个或多个字段的具体值作为目录,分目录存放文件的一种常见做法。当我们确定要选择某Grouping ID对应的数据时,我们只需读取对应目录中的数据即可。这种做法可以直接忽略Grouping ID不匹配的文件,从而大大减少启动的总task数量,减少Spark的任务调度开销。

文件索引

当总数据量较大时,存储的文件数也会比较多。此时即使我们通过Parquet的footer可以获得较好的过滤效果,我们还是要启动一些task去读取这些footer。在Spark实际的实现中,往往需要与文件数量的量级相当的task去进行footer读取。在集群计算资源有限时,调度这些任务就显得比较浪费时间。为了能进一步减少Spark作业的调度开销,进一步提高执行效率,我们实现了文件索引来优化这种场景。

文件索引就类似于独立的footer。我们提前收集每个文件中各字段数据的最大最小值,并存储在一个隐藏的数据表中。这样,我们只需要读取一个单独的表就可以从文件层面对需要处理的文件做一个初步的过滤。这是一个单独的stage,由于一个文件只对应这个隐藏表中的一条记录,因此读取隐藏表所需的task数量要远远小于读取所有数据文件footer的开销。后续stage的任务数量也因此可以大大减少。在Relational Cache的访问场景下,整体加速效果非常明显。

数据排序

为了能实现高效的数据准备过程,不论是在Parquet文件的footer还是在我们实现的文件索引中,都是主要依靠最大值和最小值的信息来过滤数据。那么在极端场景下,光靠这些统计信息可能会完全没有过滤的效果。举个例子,如果某个key的所有数据文件、所有Row Group的最大值和最小值都等于全局最大值和最小值时,对这个key的过滤就完全无效了。这样,我们会自然而然的想到对数据进行排序。

但是,传统的数据排序还有一个问题。在数据库中,当我们对多个字段进行排序时,往往字段之间具有主次关系,这就导致排序字段序列中,排在最前面的字段有很好的过滤效果,而排得靠后的字段因为数据分散,往往过滤效果越来越差。这就需要我们找到更好的排序方法,能够兼顾到多个字段的数据过滤效果。

这里涉及到一个空间填充曲线的概念。我们可以把数据想像成一个有限空间,如何将数据进行排序和分块,能够使得每一块的最值都只是在一个不大的范围内,从而让文件索引获得较好的过滤效果呢?我们选择了Z-Order曲线对多维数据空间进行排序,这样可以保证每列都有较为均衡的过滤效果。下图是二维空间中Z-Order曲线的示意图。

image
不过我们也要注意到,随着排序列的增加,单列的过滤效果将会越来越差。因此在实际运用中,我们也要对排序列进行取舍,才能获得最佳的整体效果。

小结

本文介绍了EMR Spark的Relational Cache如何从数据量较大的Cube中快速提取出所需数据加速查询的原理。通过列式存储、文件索引、Z-Order等技术,我们可以快速过滤数据,大大减少实际发生的IO数据量,避免IO瓶颈的出现,从而优化整体查询性能。

参考
Apache Parquet
Processing Petabytes of Data in Seconds with Databricks Delta
Z-Order Curve


_

相关文章
|
SQL 分布式计算 NoSQL
Spark 操作 kudu -- 增加,删除,修改,查询操作 | 学习笔记
快速学习 Spark 操作 kudu -- 增加,删除,修改,查询操作
1361 0
Spark 操作 kudu -- 增加,删除,修改,查询操作 | 学习笔记
|
28天前
|
分布式计算 运维 大数据
阿里云 EMR Serverless Spark 版免费邀测中
阿里云 EMR Serverless Spark 版,以 Spark Native Engine 为基础,旨在提供一个全托管、一站式的数据开发平台。诚邀您参与 EMR Serverless Spark 版免费测试,体验 100% 兼容 Spark 的 Serverless 服务:https://survey.aliyun.com/apps/zhiliao/iscizrF54
385 0
阿里云 EMR Serverless Spark 版免费邀测中
|
4月前
|
SQL 分布式计算 DataWorks
DataWorks on emr 创建spark节点指定dlf的catalog?
DataWorks on emr 创建spark节点指定dlf的catalog?
33 0
|
5月前
|
SQL 分布式计算 Java
201 Spark SQL查询程序
201 Spark SQL查询程序
34 0
|
分布式计算 分布式数据库 Scala
Spark查询Hbase小案例
写作目的 1)正好有些Spark连接HBase的需求,当个笔记本,到时候自己在写的时候,可以看 2)根据rowkey查询其实我还是查询了好久才找到,所以整理了一下 3)好久没发博客了,水一篇
178 0
Spark查询Hbase小案例
|
SQL 存储 DataWorks
视频-《 EMR 数据开发》|学习笔记(四)
快速学习视频-《 EMR 数据开发》
193 0
视频-《 EMR 数据开发》|学习笔记(四)
|
存储 分布式计算 资源调度
Spark on k8s 在阿里云 EMR 的优化实践
本文整理自阿里云技术专家范佚伦在7月17日阿里云数据湖技术专场交流会的分享。
Spark on k8s 在阿里云 EMR 的优化实践
|
SQL JSON 分布式计算
Spark SQL DataFrame查询和输出函数一文详解运用与方法
Spark SQL DataFrame查询和输出函数一文详解运用与方法
462 0
Spark SQL DataFrame查询和输出函数一文详解运用与方法
九十、Spark-SparkSQL(查询sql)
九十、Spark-SparkSQL(查询sql)
九十、Spark-SparkSQL(查询sql)
|
SQL 存储 缓存
Spark SQL中掌控sql语句的执行 - 了解你的查询计划
Spark SQL中掌控sql语句的执行 - 了解你的查询计划
530 0
Spark SQL中掌控sql语句的执行 - 了解你的查询计划