我在EMR上使用Spark SQL v2.4.7(使用YARN)。我编写Spark Sql查询来执行转换。
估计复杂查询的最佳混洗分区数:
我正在尝试估计需要设置的最佳混排分区数,以便为具有多个联接的复杂查询获得最佳性能。在互联网上,我发现分区的最佳大小应该在10 MB
-100 MB
的范围内。现在,由于我知道这个值,我的下一步是计算查询的数据混洗量(以MB为单位),然后将其除以100
,得到混洗分区数。然而,对于一个涉及多个具有大表的联接的复杂查询,估计洗牌量变得极其困难
那么,我如何估计混洗量,以及大查询所需的最佳混合分区数?目前(经过大量搜索)我正在执行以下步骤-
scala> spark.sql("""
| create temp view initial_tbl
| as
| with first_qry as
| (
| select a.id,
| a.fund_id,
| b.fname,
| c.state_nm,
| row_number() over (partition by a.id order by c.eff_dt desc) as rownum
| from tblA a
| left join tblB b
| on a.id = b.id
| left join tblC c
| on b.st_id = c.st_id
| )
| select * from first_qry
| where rownum = 1
| """)
scala> spark.sql("""
| create temp view final_tbl as
| select a.id, a.fname, a.state_nm, b.salary, c.age
| from initial_tbl a
| left join fin_dtls b
| on a.id = b.id
| and a.fund_id = b.fund_id
| left join age_dtls c
| on b.age_id = c.age_id
| union all
| select id, fname, 'NA' as state_nm, salary, age
| from another_pre_created_tbl
| """)
scala> spark.sql("""
| select * from final_tbl
| limit 50
| """)
注意:这只是实际查询的简化版本。
好的,现在,我正在尝试估计这个查询的数据大小,然后我可以将其除以100 MB
,以获得查询的最佳shuffle分区数。
scala> val df = spark.read.table("final_tbl")
scala> println(df.queryExecution.analyzed.stats)
Statistics(sizeInBytes=34.5 GB, hints=none)
因此,上述查询的大小是34.5 GB
,当除以100 MB
时,它给出~ 350
shuffle分区。现在,在设置了配置SET spark.sql.shuffle.partitions=350
之后,我仍然看到查询速度很慢。所以我的问题是
- 我这样做对吗?否则,请告诉我如何计算复杂查询(涉及多个联接)的混洗量,并且最终能够为任何给定的复杂查询计算混洗分区的最佳数量
SKEW:
对于上面提到的查询,我看到12
作业是在Spark UI中触发的。在UI中,最后一个作业显示高偏斜,即其中一个任务是一个长条形,其他同时执行的任务由几个非常小的条形表示(我希望我能提供UI屏幕截图)-所以,我的问题(基于以上)是-
- 如何识别上述查询的哪一部分,或者具体地一般来说,这个大型复杂查询中的哪个表/列是主要的造成偏斜的罪魁祸首?在一个大查询中有这么多表和联接通过一次连接2个表并检查UI和进度来进行测试变得非常困难和耗时
那么,有什么聪明的方法可以找出导致偏斜的实际联接表/列吗 - 此外,在确定了导致偏斜的表/列之后,我如何解决这个问题,使所有分区都有相等的数据量来处理,从而加快作业速度
- 如何将UI中的特定作业(触发火花)与查询的哪个特定部分
写入输出时倾斜:
最后,我将上述查询的输出作为-写入SQL API(%sql%
)中的S3
create table final_out
using parquet
options (
path 's3:/my/test/location/',
mode: 'overwrite'
)
as
select * from final_tbl
distribute by id;
即使是这样,当我检查UI时,我也会发现像上面这样的巨大倾斜,一个任务是一个很长的条,而其他同时执行的任务是很小的条。如果仔细观察,您会发现上面显示的最终查询使用另一个具有硬编码值的查询(即'NA' as state_nm
)执行union all
。现在,由于union
-ed表中大约有100 Million
条记录,因此值'NA'
成为输出中列state_nm
的主导值,从而产生偏斜,从而使写入非常缓慢。
所以我的最后一个问题是
- 在以拼花地板的形式写入磁盘时,如何减轻输出中硬编码值造成的这种偏移文件(使用sql API)?请注意,我曾尝试将最终输出数据集
repartition
放在它的PK列(id
)上,以在写入输出之前强制对记录进行均匀分布,但没有成功——请注意上面给出的create table
语句末尾的distribute by id
部分
我的集群配置如下:
Nodes: 20
Cores: 8
Memory: 64 GB
我对这篇冗长的帖子感到非常抱歉,但这些问题困扰了我很长时间。我在网上搜索了很多,但找不到任何具体的答案。有人能帮我解决这些问题吗。感谢您的帮助。
谢谢。
无法回答您所有的问题,但我可以分享一些想法,因为我遇到了一些问题:
如何识别上述查询的哪一部分,或者具体地一般来说,这个大型复杂查询中的哪个表/列是主要的造成偏斜的罪魁祸首?
您可以列出查询中的所有表,对用于连接它们的列进行计数,并查看哪些值表示行的过大部分。若要实现自动化,可以使用pandasprofiler或期望值库为每个表自动生成列的摘要。
我这样做对吗?否则,请告诉我如何计算复杂查询(涉及多个联接)的搅乱量,以及最终能够计算shuffle分区的最佳数量对于任何给定的复杂查询。
我不确定在混洗分区设置方面还有更多的工作要做,唯一想到的就是在执行查询之前计算最大表的大小,并使用它来使用spark.conf.set("spark.sql.shuffle.partitions", calculatedNumber)
动态计算/估计混洗分区数,但我不相信这会有帮助。
根据我的经验,更大的好处应该来自于缓存多次使用的表、广播较小的表以及在运行查询之前在联接列上划分较大的数据帧。
至于编写,我怀疑不是编写它的ef会使过程变慢,而是在编写之前执行最终查询的整个计算(延迟执行),这需要大部分时间。