在Spark SQL查询中计算最优无序分区和减少偏斜



我在EMR上使用Spark SQL v2.4.7(使用YARN)。我编写Spark Sql查询来执行转换。

估计复杂查询的最佳混洗分区数
我正在尝试估计需要设置的最佳混排分区数,以便为具有多个联接的复杂查询获得最佳性能。在互联网上,我发现分区的最佳大小应该在10 MB-100 MB的范围内。现在,由于我知道这个值,我的下一步是计算查询的数据混洗量(以MB为单位),然后将其除以100,得到混洗分区数。然而,对于一个涉及多个具有大表的联接的复杂查询,估计洗牌量变得极其困难
那么,我如何估计混洗量,以及大查询所需的最佳混合分区数?目前(经过大量搜索)我正在执行以下步骤-

scala> spark.sql("""
| create temp view initial_tbl
| as
| with first_qry as 
| (
| select a.id, 
| a.fund_id,
| b.fname, 
| c.state_nm, 
| row_number() over (partition by a.id order by c.eff_dt desc) as rownum
| from tblA a
| left join tblB b
| on a.id = b.id
| left join tblC c
| on b.st_id = c.st_id
| )
| select * from first_qry
| where rownum = 1
| """)
scala> spark.sql("""
| create temp view final_tbl as
| select a.id, a.fname, a.state_nm, b.salary, c.age
| from initial_tbl a
| left join fin_dtls b
| on a.id = b.id
| and a.fund_id = b.fund_id
| left join age_dtls c
| on b.age_id = c.age_id
| union all
| select id, fname, 'NA' as state_nm, salary, age
| from another_pre_created_tbl
| """)
scala> spark.sql("""
| select * from final_tbl 
| limit 50
| """)

注意:这只是实际查询的简化版本。

好的,现在,我正在尝试估计这个查询的数据大小,然后我可以将其除以100 MB,以获得查询的最佳shuffle分区数。

scala> val df = spark.read.table("final_tbl")
scala> println(df.queryExecution.analyzed.stats)
Statistics(sizeInBytes=34.5 GB, hints=none)

因此,上述查询的大小是34.5 GB,当除以100 MB时,它给出~ 350shuffle分区。现在,在设置了配置SET spark.sql.shuffle.partitions=350之后,我仍然看到查询速度很慢。所以我的问题是

  • 我这样做对吗?否则,请告诉我如何计算复杂查询(涉及多个联接)的混洗量,并且最终能够为任何给定的复杂查询计算混洗分区的最佳数量

SKEW
对于上面提到的查询,我看到12作业是在Spark UI中触发的。在UI中,最后一个作业显示高偏斜,即其中一个任务是一个长条形,其他同时执行的任务由几个非常小的条形表示(我希望我能提供UI屏幕截图)-所以,我的问题(基于以上)是-

  • 如何识别上述查询的哪一部分,或者具体地一般来说,这个大型复杂查询中的哪个表/列是主要的造成偏斜的罪魁祸首?在一个大查询中有这么多表和联接通过一次连接2个表并检查UI和进度来进行测试变得非常困难和耗时
    那么,有什么聪明的方法可以找出导致偏斜的实际联接表/列吗
  • 此外,在确定了导致偏斜的表/列之后,我如何解决这个问题,使所有分区都有相等的数据量来处理,从而加快作业速度
  • 如何将UI中的特定作业(触发火花)与查询的哪个特定部分

写入输出时倾斜:
最后,我将上述查询的输出作为-写入SQL API(%sql%)中的S3

create table final_out
using parquet
options (
path 's3:/my/test/location/',
mode: 'overwrite'
)
as
select * from final_tbl
distribute by id;

即使是这样,当我检查UI时,我也会发现像上面这样的巨大倾斜,一个任务是一个很长的条,而其他同时执行的任务是很小的条。如果仔细观察,您会发现上面显示的最终查询使用另一个具有硬编码值的查询(即'NA' as state_nm)执行union all。现在,由于union-ed表中大约有100 Million条记录,因此值'NA'成为输出中列state_nm的主导值,从而产生偏斜,从而使写入非常缓慢。

所以我的最后一个问题是

  • 在以拼花地板的形式写入磁盘时,如何减轻输出中硬编码值造成的这种偏移文件(使用sql API)?请注意,我曾尝试将最终输出数据集repartition放在它的PK列(id)上,以在写入输出之前强制对记录进行均匀分布,但没有成功——请注意上面给出的create table语句末尾的distribute by id部分

我的集群配置如下:

Nodes: 20
Cores:  8
Memory: 64 GB

我对这篇冗长的帖子感到非常抱歉,但这些问题困扰了我很长时间。我在网上搜索了很多,但找不到任何具体的答案。有人能帮我解决这些问题吗。感谢您的帮助。

谢谢。

无法回答您所有的问题,但我可以分享一些想法,因为我遇到了一些问题:

如何识别上述查询的哪一部分,或者具体地一般来说,这个大型复杂查询中的哪个表/列是主要的造成偏斜的罪魁祸首?

您可以列出查询中的所有表,对用于连接它们的列进行计数,并查看哪些值表示行的过大部分。若要实现自动化,可以使用pandasprofiler或期望值库为每个表自动生成列的摘要。

我这样做对吗?否则,请告诉我如何计算复杂查询(涉及多个联接)的搅乱量,以及最终能够计算shuffle分区的最佳数量对于任何给定的复杂查询。

我不确定在混洗分区设置方面还有更多的工作要做,唯一想到的就是在执行查询之前计算最大表的大小,并使用它来使用spark.conf.set("spark.sql.shuffle.partitions", calculatedNumber)动态计算/估计混洗分区数,但我不相信这会有帮助。

根据我的经验,更大的好处应该来自于缓存多次使用的表、广播较小的表以及在运行查询之前在联接列上划分较大的数据帧。

至于编写,我怀疑不是编写它的ef会使过程变慢,而是在编写之前执行最终查询的整个计算(延迟执行),这需要大部分时间。

相关内容

  • 没有找到相关文章

最新更新