EMR Spark-SQL性能极致优化揭秘 Native Codegen Framework

简介: EMR团队探索并开发了SparkSQL Native Codegen框架,为SparkSQL换了引擎,新引擎带来最高4倍性能提升,为EMR再次获取世界第一立下汗马功劳。来自阿里云EMR团队的周克勇将详细介绍Native Codegen框架。

EMR团队探索并开发了SparkSQL Native Codegen框架,为SparkSQL换了引擎,新引擎带来最高4倍性能提升,为EMR再次获取世界第一立下汗马功劳。来自阿里云EMR团队的周克勇将详细介绍Native Codegen框架。本文整理自视频 https://developer.aliyun.com/live/43579


本次分享主要分为三部分,第一做这件事情的动机和背景,第二做的过程中解决的核心问题,最后是总结。

有些同学可能了解到,EMR团队今年4月份打破了大数据领域Benchmark TBCDS的世界纪录。在硬件完全相同的情况下,性能提升了一倍,从520万分提高到1100多万分。这个成绩的背后主要依赖两条技术,增强了Optimizer和Native Runtime。Optimizer层面,我们在之前工作的基础上,又做了诸如 CTE 、动态分区裁剪、小表广播、PK/FK、Fast Decimal等优化。
截屏2020-09-04 下午10.10.03.png

大家如果关注刚结束的SparkSummit,会发现一些类似的技术,如动态分区裁剪已经进了最新的Spark3.0,EMR版本的Spark在几个月之前就支持了。另一条技术是Native Runtime,也是今天分享的主题,涵盖的主要工作,包括 Native Codegen、统一内存布局、Batch化执行框架,后续会详细介绍。

大家都知道 Optimizer的目的是获取最好的执行计划,主要技术包括states收集和Cost Model,难点是静态states不够准确,无法在Plan阶段准确预知Filter或join之后的数据量,因此对后续Plan的代价评估不够准确。

今年SparkSummit发布的adaptive Execution,就把动态stats收集和plan优化结合在一起来解决这个问题。相对应的 Runtime的目的是针对选定的plan,如何使它跑得更快,长期以来 Runtime的主要工作基本上都聚焦在解决当下的新硬件瓶颈。如MapReduce刚出来时,网络带宽是瓶颈,所以Google做了很多locality方面的优化。Spark刚出来时解决的硬件瓶颈是磁盘I/O,它通过内存缓存来提升性能。

再后来 CPU成了新的瓶颈,我们可以看到从10年到20年,磁盘I/O和网络带宽都有了每年数量级的提升,但是CPU的主频基本上保持不变,因此CPU成了新的硬硬件瓶颈,提升CPU性能,成为近年来 Runtime领域重要的优化方向。优化CPU主要有两条技术路线,向量化和Codegen。我们先看一下传统的 SQL执行所应用的火山模型的问题所在,这是一个简单的Select家,Filter加Project加Agg的例子。

截屏2020-09-04 下午10.10.37.png

在执行的过程中,在火山模型中,每个算子都是一个迭代器,下游的算子,调上游算子的next方法,next返回当前算子处理之后的中间结果。这个模型最大的问题是每条record在经过每一个算子的时候,都要经过一次虚函数调用,而虚函数调用的开销是非常大的。

第二个问题就是在每个算子之间需要把中间的结果物化到内存。针对这个问题,向量化技术给出的解,是通过批量执行加列式存储,加小循环,来更好的利用 SIMD的指令和CPU的乱序执行,从而最大化数据并行度和指令并行度,从而分摊掉虚函数调用的开销,并提升执行性能。

例如上面例子里Agg 算子计算过程,他把输入 column1,column2以及 Agg的输出结果sum都存在数组里,然后通过一个很紧凑的for循环进行计算。由于循环足够简单,编译器会做循环展开和SIMD的优化。从截图中我们可以看到,编译器生成了很多向量化的指令,此外,由于for循环足够简单,然后for循环内部基本上都是访存指令,如访问colum1的第i个数据,colum2的第i个数据,所以每次放循环最主要的时间都是在进行访存,而因为 for循环足够的短,所以CPU的乱序执行的窗口里,可以同时发射多条漏斗指令,从而解决了 Memory Wall的问题。

这个技术的代表是MonetDB/X100(2005),以及今年SparkSummit宣布的 photon(2020)技术,主要的缺点是中间缓存的数据量比较大,Codegen技术的给出的解释算子融合,他打破了Stage内部算子间的界限,拼出来跟原来的逻辑保持一致的裸的代码通常是一个大的for循环,然后把拼成的代码编译成可执行文件,这里 面展示的跨越的第一个Stage拼出来的代码,可以看到最外层是一个大的for循环,接下来是Filter,表达了 Filter算术的语义,然后在Filter的内部是Agg的语意,拼出的代码完全不存在迭代器和额外的函数调用,就像是一个新手手写的代码,而这种代码不存在任何框架上的Overhead,性能往往是最好的。

Spark的Codegen把拼成的代码交给 Janino模块做编译,在运行的时候直接load即时编译出来的class文件。Codegen技术的好处有几点:

1.用for循环代替了迭代器,完全消除了虚函数调用;

2.没有了雾化,中间数据都保存在寄存器里。它的缺点就是因为 for循环比较大,而且每次迭代执行的逻辑非常的复杂,所以很难应用SIMD的优化。这个技术的代表是Hyper和Apache Spark,尽管Spark的Java Codegen,相比之前有了数量级的提升,但依然有一些不足。首先是Java的性能还是弱于Native Code,二是Java语义的限制,例如无法显示使用 SIMD或Prefetch之类,并且由于机器的存在,无法自主精细化控制内存。

截屏2020-09-04 下午10.11.09.png

3.NativeCode更容易跟新硬件进行交互。基于这个原因,我们决定使用 Native Runtime替换Java Runtime。同时我们不想对现有的Spark做太多的改动,所以最终我们选择了Codegen技术路线,结合起来就是Native Codegen。


接下来介绍我们做Native codegen解决的核心问题,集中在三个方面,我们要生成什么代码,怎么生成这些代码,以及怎么样跟Spark做集成。
截屏2020-09-04 下午10.11.35.png

第一个问题,生成什么?

如今的NativeCode有很多,C/C++。Go Rest,LLVM等。基于我们自己的技能点,其实可以选择的就只有C/C++, C++实现起来相对直观,只需要对照原来生成的Java代码,替换成C++即可。但C++最大的问题是它在编译时间过长,根据HyPer的论文,C++的编译时间比LLVM高出了一个数量级。LLVM的编译时间很短,而且执行的效率跟C++相当,看上去是一个很不错的选择。
截屏2020-09-04 下午10.11.54.png

其实很多Native Codegen这样的系统都选择了LLVM,包括HyPer,Impala以及阿里云自研的MaxComputer,ADB等,但LLVM对我们来说还是过于复杂,它的语法接近汇编,是想用汇编重写SQL算法的工作量会有多大,其实大多数引擎也不会用
LLVM写全量的代码,比如HyPer,解码算子的核心逻辑,用LLVM生成其他通用的功能,包括spill复杂数据结构的管理等,实际上是用C++提前编写好并进行编译。 即便如此,LLVM对我们来说依然过于复杂,在广泛调研之后,另外一种可能性出现了 Weld。

先介绍一下Weld

这个是Spark的作者matei的学生的作品,他提供了包括Language+Compiler+Runner的工具链,最终会转化成LLVM,然后用LLVM的工具链编译执行, Weld最初想解决的问题是不同lib之间相互调用时数据传输的开销,例如要在pandas里调用numpy的接口,首先pandas把数据写入内存,然后numpy读取内存进行计算。
截屏2020-09-04 下午10.12.23.png

对于极度优化的Library来说,内存的写入和读取的时间可能会远超计算本身。针对这个问题,Weld开发了Common Runtime,并配套提供了一组IR,再加上惰性求值的特性,只需要简单修改Library,使其符合Weld的语法规范,便可以做到不同Library共用Weld的Runtime,再利用惰性求值实现快Library的Pipeline,从而省去数据物化的开销。Weld Runtime还做了若干优化,如循环融合循环展开,向量化自适应执行等。

此外Weld支持调用C代码,可以方便的调用三方库。我们感兴趣的是Weld提供的IR和对应的Runtime。

Weld IR语法是针对关系代数进行设计的,非常适合表达SQL语句。数据结构层面,Weld IR最核心的数据结构是vec和struct,对应C语言里的数组和struct,能较好的表达Spark SQL的 Row Batch基于struct和vec,可以构造字典数据结构,能够比较好的表达SQL里面重度使用Hash结构,操作层面,Weld IR提供了类函数式语言的语义,如Map,Filter,Iterator等配合Builder语义,能方便地表达Project、Filter、Agg、Broadcast join等算子语义,例如 select加Filter的例子,用Weld IR的表达如下,第一行是函数签名,表示入参是一个数组,数组的元素是一个struct,strut包含两个int32的成员。

接下来就是一个大的 for表达式,跟常见的语法不同,for表达式包含三个参数

1.需要遍历的数组;

2.Build,用来生成最终的结果。 Build类型也决定了最终生成的结果的。用什么数据结构来存储。

3.lambda,用来定义针对每个元素的操作,在这个例子里面,第一个参数就是这个函数的入参v第二个参数是append,表示最终构造的结果,存在一个数组里面。第三个,lambda参数是一个if表达式, if的语义跟我们常见的也不太相同,它实际上是把 if的true和false的两个分支都作为参数表达,其中第一个参数是condition,第二个参数是当condition为true的时候,所执行的逻辑。

第三个参数肯定是认为false的时候执行的逻辑,在这个里面可以看到当第二个成员它是从0开始计数,当第二个成员大于10的时候,会把第一个成员 merge到 appender里面。否则的话就什么都不做,直接返回原来的build。Weld的IR。通过 weld_module_compile和weld_module_run,两个接口,分别做编译和执行。由于Weld同时兼顾了语法简洁,编译时间短的特性,因此我们选择Weld作为生成的目标。

第二个问题就是怎么生成?

我们复用了Spark Codegen框架。我们知道 Spark Codegen包含Expression和Stage两个级别,在Expression级别,我们对照原来的doGenCode()的接口,增加了doGenNativeCode(),里面拼出来的是Weld的语法,例如之前可能Java的代码里面就直接是两个变量的相加,然后改造了以后就成了一个struct的两个成员的相加。在WholeStage级别,我们复用了producer/ consumer的框架,熟悉Spark源码的同学应该了解到,在producer/consumer框架下,每个算子都提供了produce和consume接口,produce的职责是生成为下游提供数据的代码,consumer的职责是生成消费上游数据的代码,Spark 中并非所有的算子都支持Codegen,例如outjoin就不支持支持Codegen的算子,继承了CodeGenSupport的接口,我们对整个producer/consumer的框架并没有改动,在他们旁边又新增加了一系列的接口,包括 NativeCodeGenSupport/doProduceNative/doConsumeNative。
截屏2020-09-04 下午10.12.49.png
以一个具体的例子加以说明,还是一个相比较简单的select加Filter加Project的例子, query包含三个算子,Scan、Filter、Project。
截屏2020-09-04 下午10.13.13.png

然后 query他的代码生成的过程是右上角的这张图。首先 project就是最下游的算子,它的produce方法会返回最终生成的代码的字符串。然后它这个是怎么生成的呢?Project 的doProduce。直接调用了 Filter的doProduce方法,然后Filter的doProduce方法直接调用了Scan的doProduce,然后Scan的 doProduce会生成一个框架代码,在框架代码的内部会调用Scan的 doConsume。Scan的doConsume。直接调用Filter的doConsume。Filter的doConsume会生成Filter的逻辑,并在内部调用Project的doConsume,Project的 doConsume。会把最终的数据输出 append的到 output中。

我们看下面这三张图,Scan的 doProduce会生成for循环的一个架子。然后在for循环的每个迭代里面调用 Filter的doConsume方法, Filter的doConsume会生成一个if的表达式的框架,然后在判断为true,也就是if的内部的话,调用的是project的 doConsume。 最后project的doConsume拼成一段append 的方法把column1 append到 output里面到此为止。一个完整的Java的Codegen过程就结束了,然后我们就拿得到了直接可以编译的Java代码,当然这个是简化的过程。
截屏2020-09-04 下午10.13.57.png

对于Native Codegen的话,我们是复用了这个逻辑,只是把生成的Java代码替换成了Weld的IR,如底下三张图所示,具体的Weld,语法我就不详细展开了。
截屏2020-09-04 下午10.15.28.png

感兴趣的同学可以到Weld官网上看语法定义,代码生成还有一个问题就是Fallback机制,由于人力有限,我们无法覆盖所有的算子,因此需要实现Fallback机制。这里需要做的决定是应该做算子级别的Fallback,还是Stage级别的Fallback。直观上算子粒力度的Fallback好像更加合理,实际上却会导致更严重的问题。它会导致Stage内部Pipeline的断裂。前面讲了Codegen的一个优势是整个过程不存在物化,而算子力度的Fallback则会导致Stage内部一部分算是走Native Runtime,另一部分走Java Runtime,两者的连接数无可避免存在数据物化,开销通常会大于Native Runtime带来的收益。
截屏2020-09-04 下午10.15.45.png
基于这个原因,我们选择Stage级别的Fallback,一旦有任何算子不支持Native Codegen,在整个Stage都Fallback到Java Codegen,代码也已经生成了。

最后的问题,如何跟Spark集成 。

task的执行可以理解为一个黑盒,它的输入是Row Batch或者Row Iterator我们知道在Scan Stage Spark用了向量化读的优化,读出来的是列式存储的 column batch,每一列本质上都用一个数组进行存储,而在Shuffle Stage,Shuffle Fetch回来的数据结构是行式存储的 unsaferowbatch。每个Stage的输出会封装成会封装成Row Iterator。
截屏2020-09-04 下午10.16.11.png

我们前面讲到既然选择了Stage级别的Fallback,意味着黑盒要么是Java Runtime要么是Native Runtime,不存在混合的情况。因此我们关心的如何把输入转化为Weld认识的内存布局,以及如何把Weld的输出包装成Row Iterator。针对列存数据,打开offheap开关,数据天生就是指针数组,Weld可以直接操作。对于行存数据,主要问题是变长数据难以映射到 Weld的 struct右上方的图展示了Spark Row Batch的内存布局,首先是固定长度的,null bitmap,然后是固定长度的列数据,最后是变长数据,由于变长数据的存在,无法直接把一条record映射成 strut。

我们的做法是把定长部分和变长部分分别拷贝出来,并有offset和length来标志变量部分的位置和长度。这样一来record就能映射到strut结构了,而整个Row Batch就映射成了一个 vec strut。例如这个例子,每个record包括两个long和一个String,null bit用一个long表示,紧接着是两个long表示两个列的数据。第三个long保存变长数据的 offset和length,最后是变成部分,我们把变成拷贝出去之后,根据原先的offset和length,计算新的offset和length,最终我们用1个包含5个long的strut表示 record,分别是 null bitmap原先的两个long offset和length。这样一来我们就完成了统一内存布局,并且当且仅当有变长数据存在的时候才需要拷贝,否则的话是不需要拷贝的。Weld输出转换成Row Batch是刚才所说的过逆向过程,这里就不再赘述了,完成了数据转换,最后是Spark的执行流程。首先我们尝试走Native Codegen,若有异常发生,则切换到Java Codegen。若没有异常,则执行StageInit做初始化工作,包括初始化Weld,加载编译好的Weld module,拉取Broadcast数据等。

接着是一个循环,每个循环会读取一个Row Batch,给Native Runtime来执行,输出结果包装成Row Iterator,给Shuffle Write。以上就是EMR团队在Native Runtime上做的探索。总结下来,我们采用Weld的IR作为代码生成的目标语言,复用了Spark Codegen框架,进行代码生成,采用了Stage级别的Fallback机制,并通过统一内存布局跟Spark做了集成。
截屏2020-09-04 下午10.16.30.png

由于时间有限,一些工作没有包含在今天的分享中,例如 Weld的不好表达的算子,如SortMergejoin、Partitionby ,我们其实也用了Native的技术进行了优化。再例如 Weld本身的字典的实现效率比较低,我们也对此进行了比较大的优化。除了Native Runtime,EMR团队在Spark很多技术点都做了工作,欢迎大家交流沟通。
截屏2020-09-04 下午10.16.55.png
截屏2020-09-04 下午10.26.56.png

相关实践学习
数据湖构建DLF快速入门
本教程通过使⽤数据湖构建DLF产品对于淘宝用户行为样例数据的分析,介绍数据湖构建DLF产品的数据发现和数据探索功能。
快速掌握阿里云 E-MapReduce
E-MapReduce 是构建于阿里云 ECS 弹性虚拟机之上,利用开源大数据生态系统,包括 Hadoop、Spark、HBase,为用户提供集群、作业、数据等管理的一站式大数据处理分析服务。 本课程主要介绍阿里云 E-MapReduce 的使用方法。
相关文章
|
25天前
|
SQL 存储 关系型数据库
一文搞懂SQL优化——如何高效添加数据
**SQL优化关键点:** 1. **批量插入**提高效率,一次性建议不超过500条。 2. **手动事务**减少开销,多条插入语句用一个事务。 3. **主键顺序插入**避免页分裂,提升性能。 4. **使用`LOAD DATA INFILE`**大批量导入快速。 5. **避免主键乱序**,减少不必要的磁盘操作。 6. **选择合适主键类型**,避免UUID或长主键导致的性能问题。 7. **避免主键修改**,保持索引稳定。 这些技巧能优化数据库操作,提升系统性能。
221 4
一文搞懂SQL优化——如何高效添加数据
|
2天前
|
SQL 分布式计算 资源调度
一文解析 ODPS SQL 任务优化方法原理
本文重点尝试从ODPS SQL的逻辑执行计划和Logview中的执行计划出发,分析日常数据研发过程中各种优化方法背后的原理,覆盖了部分调优方法的分析,从知道怎么优化,到为什么这样优化,以及还能怎样优化。
|
10天前
|
SQL 关系型数据库 数据库
【后端面经】【数据库与MySQL】SQL优化:如何发现SQL中的问题?
【4月更文挑战第12天】数据库优化涉及硬件升级、操作系统调整、服务器/引擎优化和SQL优化。SQL优化目标是减少磁盘IO和内存/CPU消耗。`EXPLAIN`命令用于检查SQL执行计划,关注`type`、`possible_keys`、`key`、`rows`和`filtered`字段。设计索引时考虑外键、频繁出现在`where`、`order by`和关联查询中的列,以及区分度高的列。大数据表改结构需谨慎,可能需要停机、低峰期变更或新建表。面试中应准备SQL优化案例,如覆盖索引、优化`order by`、`count`和索引提示。优化分页查询时避免大偏移量,可利用上一批的最大ID进行限制。
36 3
|
26天前
|
SQL 关系型数据库 MySQL
【MySQL技术之旅】(7)总结和盘点优化方案系列之常用SQL的优化
【MySQL技术之旅】(7)总结和盘点优化方案系列之常用SQL的优化
40 1
|
28天前
|
SQL 索引
SQL怎么优化
SQL怎么优化
30 2
|
1月前
|
分布式计算 运维 大数据
阿里云 EMR Serverless Spark 版免费邀测中
阿里云 EMR Serverless Spark 版,以 Spark Native Engine 为基础,旨在提供一个全托管、一站式的数据开发平台。诚邀您参与 EMR Serverless Spark 版免费测试,体验 100% 兼容 Spark 的 Serverless 服务:https://survey.aliyun.com/apps/zhiliao/iscizrF54
394 0
阿里云 EMR Serverless Spark 版免费邀测中
|
1月前
|
SQL 监控 测试技术
SQL语法优化与最佳实践
【2月更文挑战第28天】本章将深入探讨SQL语法优化的重要性以及具体的优化策略和最佳实践。通过掌握和理解这些优化技巧,读者将能够编写出更高效、更稳定的SQL查询,提升数据库性能,降低系统资源消耗。
|
4月前
|
关系型数据库 MySQL BI
用友畅捷通基于阿里云 EMR StarRocks 搭建实时湖仓实战分享
本文从用友畅捷通公司介绍及业务背景;数据仓库技术选型、实际案例及未来规划等方面,分享了用友畅捷通基于阿里云 EMR StarRocks 搭建实时湖仓的实战经验。
601 0
用友畅捷通基于阿里云 EMR StarRocks 搭建实时湖仓实战分享
|
8月前
|
存储 SQL 分布式计算
阿里云全托管flink-vvp平台hudi connector实践(基于emr集群oss-hdfs存储)
阿里云全托管flink-vvp平台hudi sink connector实践,本文数据湖hudi基于阿里云E-MapReduce产品,以云对象存储oss-hdfs作为存储
|
11月前
|
SQL 存储 监控
水滴筹基于阿里云 EMR StarRocks 实战分享
水滴筹大数据部门的数据开发工程师韩园园老师为大家分享水滴筹基于阿里云EMR StarRocks的实战经验。
5781 3
水滴筹基于阿里云 EMR StarRocks 实战分享