草庐IT

Spark AQE SkewedJoin 在字节跳动的实践和优化

字节 LAS 团队 2023-03-28 原文

1. 概述

本文将首先介绍 Spark AQE SkewedJoin 的基本原理以及字节跳动在使用 AQE SkewedJoin 的实践中遇到的一些问题;其次介绍针对遇到的问题所做的相关优化和功能增强,以及相关优化在字节跳动的收益;此外,我们还将分享 SkewedJoin 的使用经验。

2. 背景

首先对 Spark AQE SkewedJoin 做一个简单的介绍。Spark Adaptive Query Execution, 简称 Spark AQE,总体思想是动态优化和修改 stage 的物理执行计划。利用执行结束的上游 stage 的统计信息(主要是数据量和记录数),来优化下游 stage 的物理执行计划。

Spark AQE 能够在 stage 提交执行之前,根据上游 stage 的所有 MapTask 的统计信息,计算得到下游每个 ReduceTask 的 shuffle 输入,因此 Spark AQE 能够自动发现发生数据倾斜的 Join,并且做出优化处理,该功能就是 Spark AQE SkewedJoin。

例如 A 表 inner join B 表,并且 A 表中第 0 个 partition(A0)是一个倾斜的 partition,正常情况下,A0 会和 B 表的第 0 个 partition(B0)发生 join,由于此时 A0 倾斜,task 0 就会成为长尾 task。

SkewedJoin 在执行 A Join B 之前,通过上游 stage 的统计信息,发现 partition A0 明显超过平均值的数倍,即判断 A Join B 发生了数据倾斜,且倾斜分区为 partition A0。Spark AQE 会将 A0 的数据拆成 N 份,使用 N 个 task 去处理该 partition,每个 task 只读取若干个 MapTask 的 shuffle 输出文件,如下图所示,A0-0 只会读取 Stage0#MapTask0 中属于 A0 的数据。这 N 个 Task 然后都读取 B 表 partition 0 的数据做 join。这 N 个 task 执行的结果和 A 表的 A0 join B0 的结果是等价的。

不难看出,在这样的处理中,B 表的 partition 0 会被读取 N 次,虽然这增加了一定的额外成本,但是通过 N 个任务处理倾斜数据带来的收益仍然大于这样的成本。

Spark 从3.0 版本开始支持了 AQE SkewedJoin 功能,但是我们在实践中发现了一些问题。

  • 不准确的统计数据可能导致 Spark 无法识别数据倾斜。
  • 切分不均匀导致优化处理效果不理想。
  • 不支持复杂场景例如同一个字段发生连续 join。
我将在【优化增强】中详述这些问题以及我们的优化和解决方案。

3. 优化增强

3.1 提高数据倾斜的识别能力

由 Spark AQE 处理数据倾斜的原理不难发现,Spark AQE 识别倾斜以及切分数据倾斜的功能依赖于上游 Stage 的统计数据,统计数据越准确,倾斜的识别能力和处理能力就越高,直观表现就是倾斜数据被拆分的非常平均,拆分后的数据大小几乎和中位数一致,将长尾Task的影响降到最低。

MapStage 执行结束之后,每一个 MapTask 会生成统计结果 MapStatus,并将其发送给 Driver。MapStatus维护了一个 Array[Long],记录了该 MapTask 中属于下游每一个 ReduceTask 的数据大小。当 Driver 收集到了所有的 MapTask 的MapStatu之后,就能够计算得到每一个 ReduceTask 的输入数据量,以及分属于每一个上游 MapTask 的数据大小。根据每一个 ReduceTask 的数据大小,Spark AQE 能够判断出数据倾斜,并根据上游 MapTask 的统计信息,合理切分 Reducetask,尽可能保证切分的均匀性。

如下图描述,ReduceTask0 的 ShuffleRead(shuffle 过程中读取的数据量) 为 200,明显大于 ReduceTask1 和 ReduceTask2 的 100,发生了数据倾斜。我们可以将 ReduceTask0 拆成 2 份,ReduceTask0-0 读取 MapTask0 和 MapTask1 的数据,ReduceTask0-1 读取 MapTask2 和 MapTask3 的数据,拆分后的两个 task 的 ShuffleRead 均为 100。

我们可以看出,统计信息的大小的空间复杂度是 O(M*R),对于大任务而言,会占据大量的 Driver 内存,所以 Spark 原生做了限制,对于 MapTask,当下游 ReduceTask 个数大于某一阈值(spark.shuffle.minNumPartitionsToHighlyCompress​,默认 2000),就会将MapStatus进行压缩,所有小于 spark.shuffle.accurateBlockThreshold(默认100M)的值都会被一个平均值所代替填充。

举个例子,下图是我们遇到的一个 SkewedJoin 没有生效的作业,从运行 metrics 来看,ShuffleRead 发生了很严重的倾斜,符合 SkewedJoin 生效的场景,但实际运行时并没有生效。

通过阅读日志,可以看到,Spark AQE 在运行时,获取的 join 两侧的 shuffle partitions 的中位数和最大值都是一样的,所以没有识别到任何的倾斜。这就是由于压缩后 MapStatus 的统计数据的不准确造成的。

我们在实践中,遇到很多大作业由于统计数据不准确,无法识别倾斜。而当我们尝试提高这一阈值之后,部分大作业由于 Driver 内存使用上涨而失败,为了解决这一问题,我们做了以下优化:

  1. Driver 收到详细的 MapStatus之后,先将数据用于更新每个 ReduceTask 的累计输入数据,然后将 MapStatus压缩,这样就不会占用太多内存。此时,虽然压缩后的 MapStatus无法让我们获得 ReduceTask 准确的上游分布,但是能够获得准确的 ReduceTask 的输入数据总大小,这样我们就能够准确的识别发生倾斜的 ReduceTask。
  2. 上述优化增加了一次 MapStatus 的解压操作,而 MapStatus 的解压是一个比较耗CPU的操作,对于大作业可能出现 Driver CPU 被打满,无法处理 Executor 心跳导致作业失败的情况。对此,我们使用缓存保证Driver端在消费 MapStatus 时,每个 MapStatus 只会被解压一次,大大降低了优化带来的 Overhead。
通过上述优化,我们成功在线上将默认阈值从 2000 调整为 5000,保证了线上 96.6% 的 Spark 作业能够准确的识别数据倾斜(如果存在)。

3.2 提高倾斜数据的切分均匀程度

由于 HighlyCompressMapStatus 用平均值填充所有低于 spark.shuffle.accurateBlockThreshold 的值,每个 ReduceTask 通过压缩后的 MapStatus 累加计算得到的总数据大小和数据分布,就和实际差距很大。

举个简单的例子:我们得到 ReduceTask0 的实际总数据是 1G,而中位数是 100M,因此我们的期望是将 ReduceTask0 拆成 10 份,每一份是 100M。此时上游的 MapStage 一共有 100 个 MapTask,除了 MapTask0 中属于 ReduceTask0 的数据是 100M,其他 99 个 MapStak 的数据都是 10M。当我们将所有的 MapStatus 压缩之后,AQE 获取的 ReduceTask0 的上游分布,就是 MapTask0 有 100M (因为大块数据所以被保留),其他 99 个 MapTask 的数据都是 1M(在压缩时使用平均值填充)。这时,Spark AQE 按照 100M 的期望值来切分,只会切分成两个 ReduceTask:ReduceTask0-0(读取MapTask0)和 ReduceTask0-1(读取剩下99个MapTask)。

基于此,我们改进后的方法是利用精确的 ReduceTask 数据量来反推每个 MapperTask 对应的数据量,得到尽可能准确的数据分布。同样是刚才的例子,我们已知 ReduceTask0 的实际总数据是 1G,MapStatus 压缩的阈值是 100M,那么可以确定的是,MapTask0 关于 ReduceTask0 的数据 100M 是准确被保留的(因为大于等于阈值),而其他 99 个 MapTask 的数据都是不准确的。此时 AQE 就不会使用被压缩的数据,而是通过 1G 的总数据反推得到其他 99 个 MapTask 中属于 ReduceTask0 的数据是 10M,虽然同样是存在误差的平均值,但是相比压缩数据,通过准确的总量反推得到的平均值会更加准确。这个时候 Spark 按照 100M 的期望值来切分,就会切成 10 个 ReduceTask,符合我们的预期。

而在实际应用中,利用新方案,AQE SkewedJoin 切分倾斜数据更加平均,优化效果有明显的提升。

下图是某个倾斜处理效果不理想的作业,SkewedJoin 生效后,该 Stage ShuffleReadSize 的中位数和最大值分别为 4M 和 9.9G。

经过我们的优化后,该 Stage 的 ShuffleReadSize 的中位数和最大值分别为 149M 和 1427M,倾斜分区的切分更加均匀,该 Stage 的运行时间也由原来的 2h 降为 20m。

3.3 支持更多的场景

场景1:JoinWithAggOrWin

以下图为例,Stage10 虽然只有一个 SortMergeJoin,但是 join 的一边并不是 Sort+Exchange 的组合,而是存在 Aggregate 算子或者 Window 算子,因此不属于社区实现的范围内。

场景2:MultipleSkewedJoin

在用户的业务逻辑中,经常出现这样一种场景:一张表的主键需要连续的 join 多张表,这种场景体现在 Spark 的具体执行上,就是连续的 join 存在于同一个 Stage 当中。如下图所示 Stage21 中存在连续的多个 SortMergeJoin,而这种场景也是社区的实现无法优化的。

场景3:JoinWithUnion

Stage 中有 Union 算子,且 Union 的 children 中有 SMJ。

此外,我们还支持了 ShuffleHashJoin、 BucketJoin、MultipleJoinWithAggOrWin 等更多场景。

4. 字节的实践

上面介绍的 LAS 对 Spark AQE SkewedJoin 的优化功能在字节跳动内部已使用 1 年左右,截止 2022年8月,优化日均覆盖1.8万+ Spark 作业,优化命中作业平均性能提升 35% 左右,其中 30% 被优化的 Spark 作业所属于的场景是 LAS 自研支持的,大家可以通过火山引擎开通 LAS 服务并体验这些优化功能。

5. 用户指南

5.1 哪些场景 AQE SkewedJoin 不支持

AQE SkewedJoin 功能并不能处理所有发生数据倾斜的 Join,这是由它的实现逻辑所决定的。

第一,如果倾斜的分区的大部分数据来自于上游的同一个 Mapper,AQE SkewedJoin 无法处理,原因是 Spark 不支持 Reduce Task 只读取上游 Mapper 的一个 block 的部分数据。

第二,如果 Join 的发生倾斜的一侧存在 Agg 或者 Window 这类有指定 requiredChildDistribution 的算子,那么 SkewedJoin 优化无法处理,因为将分区切分会破坏 RDD 的 outputPartitioning,导致不再满足 requiredChildDistribution。

第三,对于 Outer/Semi Join,AQE SkewedJoin 是无法处理非 Outer/Semi 侧的数据倾斜。比如,对于 LeftOuter Join,SkewedJoin 无法处理右侧的数据倾斜。

第四,AQE 无法处理倾斜的 BroadcastHashJoin。

5.2 AQE SkewedJoin 优化效果不明显时的措施

如果遇到了符合应用场景但是 SkewedJoin 没有生效或者倾斜处理效果不理想的情况,有以下调优手段:

  • 提高spark.shuffle.minNumPartitionsToHighlyCompress,保证值大于等于 shuffle 并发(当开启 AQE 时,即为spark.sql.adaptive.coalescePartitions.initialPartitionNum)。
  • 调小spark.shuffle.accurateBlockThreshold,比如 4M。但是需要注意的是,这会增加 Driver 的内存消耗,需要同步增加 Driver 的 cpu 和内存。
  • 降低spark.sql.adaptive.skewJoin.skewedPartitionFactor,降低定义发生倾斜的阈值。

6. 总结

本文首先简单介绍了 Spark AQE 的基本思想以及 SkewedJoin 功能的原理,接着提出了我们在应用 SkewedJoin的过程中遇到的一些问题。针对这些问题,我们介绍了对 AQE SkewedJoin 做的优化和增强——提高统计的准确度;提高倾斜数据的切分均匀程度;支持了更多的场景。接着,本文介绍了 AQE SkewedJoin 在字节跳动的使用情况,包括日均优化覆盖作业和优化效果,其中30%被优化的 Spark 作业所属于的场景是字节自研支持的。最后分享了我们关于 AQE SkewedJoin 的用户指南:哪些场景 AQE SkewedJoin 不支持;当 AQE SkewedJoin 效果不明显时,可以采取哪些措施。

7. 附录A :本文涉及的关于 AQE SkewedJoin 优化的相关参数配置

参数配置名

默认值

参数意义

spark.shuffle.minNumPartitionsToHighlyCompress

2000

决定 Mapstatus 使用 HighlyCompressedMapStatus还是 CompressedMapStatus 的阈值,如果 huffle partition 大于该值,则使用 HighlyCompressedMapStatus。

spark.shuffle.accurateBlockThreshold

100M

HighlyCompressedMapStatus 中记录 shuffle blcok 准确大小的阈值,当 block 小于该值则用平均值代替。

spark.sql.adaptive.skewJoin.skewedPartitionFactor

10

如果一个 partition 大于该因子乘以分区大小的中位数,那么它就是倾斜的 partition。

有关Spark AQE SkewedJoin 在字节跳动的实践和优化的更多相关文章

  1. ruby-on-rails - 使用 Ruby on Rails 进行自动化测试 - 最佳实践 - 2

    很好奇,就使用ruby​​onrails自动化单元测试而言,你们正在做什么?您是否创建了一个脚本来在cron中运行rake作业并将结果邮寄给您?git中的预提交Hook?只是手动调用?我完全理解测试,但想知道在错误发生之前捕获错误的最佳实践是什么。让我们理所当然地认为测试本身是完美无缺的,并且可以正常工作。下一步是什么以确保他们在正确的时间将可能有害的结果传达给您? 最佳答案 不确定您到底想听什么,但是有几个级别的自动代码库控制:在处理某项功能时,您可以使用类似autotest的内容获得关于哪些有效,哪些无效的即时反馈。要确保您的提

  2. 叮咚买菜基于 Apache Doris 统一 OLAP 引擎的应用实践 - 2

    导读:随着叮咚买菜业务的发展,不同的业务场景对数据分析提出了不同的需求,他们希望引入一款实时OLAP数据库,构建一个灵活的多维实时查询和分析的平台,统一数据的接入和查询方案,解决各业务线对数据高效实时查询和精细化运营的需求。经过调研选型,最终引入ApacheDoris作为最终的OLAP分析引擎,Doris作为核心的OLAP引擎支持复杂地分析操作、提供多维的数据视图,在叮咚买菜数十个业务场景中广泛应用。作者|叮咚买菜资深数据工程师韩青叮咚买菜创立于2017年5月,是一家专注美好食物的创业公司。叮咚买菜专注吃的事业,为满足更多人“想吃什么”而努力,通过美好食材的供应、美好滋味的开发以及美食品牌的孵

  3. Ruby - 如何将消息长度表示为 2 个二进制字节 - 2

    我正在使用Ruby,我正在与一个网络端点通信,该端点在发送消息本身之前需要格式化“header”。header中的第一个字段必须是消息长度,它被定义为网络字节顺序中的2二进制字节消息长度。比如我的消息长度是1024。如何将1024表示为二进制双字节? 最佳答案 Ruby(以及Perl和Python等)中字节整理的标准工具是pack和unpack。ruby的packisinArray.您的长度应该是两个字节长,并且按网络字节顺序排列,这听起来像是n格式说明符的工作:n|Integer|16-bitunsigned,network(bi

  4. ruby-on-rails - Rails 中同一个类的多个关联的最佳实践? - 2

    我认为我的问题最好用一个例子来描述。假设我有一个名为“Thing”的简单模型,它有一些简单数据类型的属性。像...Thing-foo:string-goo:string-bar:int这并不难。数据库表将包含具有这三个属性的三列,我可以使用@thing.foo或@thing.bar之类的东西访问它们。但我要解决的问题是当“foo”或“goo”不再包含在简单数据类型中时会发生什么?假设foo和goo代表相同类型的对象。也就是说,它们都是“Whazit”的实例,只是数据不同。所以现在事情可能看起来像这样......Thing-bar:int但是现在有一个新的模型叫做“Whazit”,看起来

  5. ruby-on-rails - 向 Rails 3 添加 Ruby 扩展方法的最佳实践? - 2

    我有一个要在我的Rails3项目中使用的数组扩展方法。它应该住在哪里?我有一个应用程序/类,我最初把它放在(array_extensions.rb)中,在我的config/application.rb中我加载路径:config.autoload_paths+=%W(#{Rails.root}/应用程序/类)。但是,当我转到railsconsole时,未加载扩展。是否有一个预定义的位置可以放置我的Rails3扩展方法?或者,一种预先定义的方式来添加它们?我知道Rails有自己的数组扩展方法。我应该将我的添加到active_support/core_ext/array/conversion

  6. Ruby 最佳实践 : working with classes - 2

    参见下面的示例,我想最好使用第二种方法,但第一种也可以。哪种方法最好,使用另一种的后果是什么?classTestdefstartp"started"endtest=Test.newtest.startendclassTest2defstartp"started"endendtest2=Test2.newtest2.start 最佳答案 我肯定会说第二种变体更有意义。第一个不会导致错误,但对象实例化完全过时且毫无意义。外部变量在类的范围内不可见:var="string"classAvar=A.newendputsvar#=>strin

  7. ruby - Sinatra:哈希的未定义方法字节大小 - 2

    很难说出这里要问什么。这个问题模棱两可、含糊不清、不完整、过于宽泛或夸夸其谈,无法以目前的形式得到合理的回答。如需帮助澄清此问题以便重新打开,visitthehelpcenter.关闭9年前。我正在创建一个Sinatra应用程序,它采用上传的CSV文件并将其内容放入哈希中。当我像这样在我的app.rb中引用这个散列时:hash=extract_values(path_to_filename)我不断收到此错误消息:undefinedmethod`bytesize'forHash:0x007fc5e28f2b90#object_idfile:utils.rblocation:bytesiz

  8. ruby - 存储外部 API 的密码 - 最佳实践 - 2

    如果我构建了一个应用程序来访问来自Gmail、Twitter和Facebook的一些数据,并且我希望用户只需输入一次他们的身份验证信息,并且在几天或几周后重置,那会怎样是在Ruby中动态执行此操作的最佳方法吗?我看到很多人只是拥有他们客户/用户凭证的配置文件,如下所示:gmail_account:username:myClientpassword:myClientsPassword这看起来a)非常不安全,b)如果我想为成千上万的用户存储此类信息,它就无法工作。推荐的方法是什么?我希望能够在这些服务之上构建一个界面,因此每次用户进行交易时都必须输入凭据是不可行的。

  9. ruby-on-rails - 使用设计身份验证的 API 访问 - 最佳实践? - 2

    我正在使用Devise在Rails应用程序中,并希望通过API公开一些模型数据,但应该像应用程序一样限制对API的访问。$curlhttp://myapp.com/api/v1/sales/7.json{"error":"Youneedtosigninorsignupbeforecontinuing."}很明显。在这种情况下是否有访问API的最佳实践?我更喜欢一步验证+获取数据,但这只是为了让客户的工作更轻松。他们将使用JQuery在客户端提取数据。感谢您提供任何信息!凡妮莎 最佳答案 我建议您按照以下帖子中的选项2:使用APIke

  10. ruby-on-rails - 在多个页面上使用相同表单的 Rails 最佳实践 - 2

    我正在开发一个Rails2.3.1网站。在整个网站中,我需要一个用于在各种页面(主页、创建帖子页面、帖子列表页面、评论列表页面等)上创建帖子的表单——只要说这个表单需要在由各种Controller)。这些页面中的每一个都显示在相应的Controller/操作中检索到的各种其他信息。例如,主页列出了最新的10篇文章、从数据库中提取的内容等。因此,我已将帖子创建表单移动到它自己的部分中,并将该部分包含在所有必要的页面中。请注意,部分POST中的表单到/questions(路由到PostsController::create——这是默认的Rails行为)。我遇到的问题是当Posts表单没有正

随机推荐