由于数据分布不均匀,这个Spark SQL查询的性能很差:
select c.*, coalesce(
sum(revenue)
OVER (PARTITION BY cid, pid, code
ORDER BY (cTime div (1000*3600))
RANGE BETWEEN 336 PRECEDING and 1 PRECEDING), 0L) as totalRevenue
from records c
我在SparkUI中看到,如果我增加扫描范围,单个任务堆栈和集群会失败。
我在AWS EMR使用Yarn,带有Spark 2.2.0
我该如何克服这个问题?感谢
我只能推荐几种方法来缓解您的调查条件。实际上,我会尝试两种不首先处理偏斜的方法:
- 请尝试增加每条消息的执行器内存。在YARN上,您可能还需要增加最大容器内存。Spark IIRC的默认值是2gb,需要增加它并不罕见
- 尝试切换到memory_and_disk或disk_only持久性级别。我相信这应该适用于您的查询,尽管很难查看完整的查询计划
原因是,至少在我看来,您的数据从根本上是扭曲的。如果你开始重塑数据,以特定的方式解决数据当前形状的倾斜问题,那么你将面临维护困难,因为数据的形状可能会随着时间的推移而变化。在我看来,至少您希望尽可能长时间地保留查询的最直接的实现,并且只有在遇到SLA违规等问题时才能通过编程优化偏斜问题。
如果这些都不起作用,那么你可以尝试直接解决偏斜问题一个简单的方法是创建第三列,该列由已知有问题的列值的随机数填充。将其作为键,进行一次求和操作,然后再进行第二次,去掉多余的随机列。或者,您可以执行两个查询并将它们连接起来:一个查询带有倾斜数据的随机数(仍必须在两次过程中处理),另一个查询未更改,用于无问题数据
编辑-通过两个帧计算部分和
这里最有用的观察是加法是交换的和结合的。我最初基于随机数的建议不会奏效,但这会的。基本上,你想计算你想要的帧的几个部分的部分和。最简单的方法可能是作为一组范围(为了简单起见,这里使用了两个):
create temporary table partial_revenue_1 as select c.*, coalesce(
sum(revenue)
OVER (PARTITION BY cid, pid, code
ORDER BY (cTime div (1000*3600))
RANGE BETWEEN 336 PRECEDING and 118 PRECEDING), 0L) as partialTotalRevenue
from records c
create temporary table partial_revenue_2 as select c.*, coalesce(
sum(revenue)
OVER (PARTITION BY cid, pid, code
ORDER BY (cTime div (1000*3600))
RANGE BETWEEN 117 PRECEDING and 1 PRECEDING), 0L) as partialTotalRevenue
from records c
create temporary table combined_partials as select * from
partial_reveneue_1 union all select * from partial_revenue_2
select sum(partialTotalRevenue), first(c.some_col) ... from
combined_partials c group by cid, pid, code
请注意,您需要使用first
聚合函数来剔除records
表上早期select *
操作中的重复字段。别担心,这会很好,因为两个值都来自同一个表。