Spark:控制分区以减少混洗



我正试图了解Spark中partitioning数据帧的不同方式,以减少特定管道上的混洗量。

这是我正在处理的数据帧,它包含40多亿行和80列:

+-----+-------------------+-----------+
|  msn|          timestamp| Flight_Id |
+-----+-------------------+-----------+
|50020|2020-08-22 19:16:00|       72.0|
|50020|2020-08-22 19:15:00|       84.0|
|50020|2020-08-22 19:14:00|       96.0|
|50020|2020-08-22 19:13:00|       84.0|
|50020|2020-08-22 19:12:00|       84.0|
|50020|2020-08-22 19:11:00|       84.0|
|50020|2020-08-22 19:10:00|       84.0|
|50020|2020-08-22 19:09:00|       84.0|
|50020|2020-08-22 19:08:00|       84.0|
|50020|2020-08-22 19:07:00|       84.0|
|50020|2020-08-22 19:06:00|       84.0|
|50020|2020-08-22 19:05:00|       84.0|
|50020|2020-08-22 19:04:00|       84.0|
|50020|2020-08-22 19:03:00|       84.0|
|50020|2020-08-22 19:02:00|       84.0|
|50020|2020-08-22 19:01:00|       84.0|
|50020|2020-08-22 19:00:00|       84.0|
|50020|2020-08-22 18:59:00|       84.0|
|50020|2020-08-22 18:58:00|       84.0|
|50020|2020-08-22 18:57:00|       84.0|
+-----+-------------------+-----------+

这代表了不同飞机(总共41架飞机)的时间序列集合。我对这些数据只做了两件事:

  1. 使用由MSNFlight_ID划分的窗口和由timestamp划分的order By进行过滤,以保持每次飞行的最后30分钟
  2. 在剩下的列上,计算meanstdev,并对数据进行归一化

我有32个执行器,每个执行器有12g内存,运行30小时后作业崩溃,并显示以下消息:

The driver running the job crashed, ran out of memory, or otherwise became unresponsive while it was running.

查看查询计划,我注意到我有300多个步骤,其中60多个步骤涉及洗牌(所有步骤物理计划看起来完全相同):

AdaptiveSparkPlan(isFinalPlan=false)
+- CollectLimit 1
+- HashAggregate(keys=[], functions=[avg(3546001_421#213), stddev_samp(3546001_421#213)], output=[avg(3546001_421)#10408, stddev_samp(3546001_421)#10417])
+- Exchange SinglePartition, true
+- HashAggregate(keys=[], functions=[partial_avg(3546001_421#213), partial_stddev_samp(3546001_421#213)], output=[sum#10479, count#10480L, n#10423, avg#10424, m2#10425])
+- Project [3546001_421#213]
+- Filter (isnotnull(rank#10238) && (rank#10238 <= 1800))
+- Window [rank(timestamp#10081) windowspecdefinition(Flight_Id_Int#209, timestamp#10081 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#10238], [Flight_Id_Int#209], [timestamp#10081 DESC NULLS LAST]
+- Sort [Flight_Id_Int#209 ASC NULLS FIRST, timestamp#10081 DESC NULLS LAST], false, 0
+- ShuffleQueryStage 0
+- Exchange hashpartitioning(Flight_Id_Int#209, 200), true
+- Union
:- *(1) Project [Flight_Id_Int#209, cast((cast(timestamp#212L as double) / 1.0E9) as timestamp) AS timestamp#10081, 3546001_421#213]

我有一种强烈的感觉,首先通过msn进行分区将有助于减少混洗的数量,因为大多数工作都在msn级别。

我的问题是如何在我的代码中我应该在哪里重新分区?如果我使用repartitionrepartition和密钥hash partitioning,我一直在阅读关于这个不同分区的文档,我对它们的使用以及这是否真的是我问题的解决方案感到困惑。

谢谢。

编辑1:

以下是合乎逻辑的计划:

GlobalLimit 1
+- LocalLimit 1
+- Aggregate [avg(3566000_421#214) AS avg(3566000_421)#10594, stddev_samp(3566000_421#214) AS stddev_samp(3566000_421)#10603]
+- Project [3566000_421#214]
+- Filter (isnotnull(rank#10238) && (rank#10238 <= 1800))
+- Window [rank(timestamp#10081) windowspecdefinition(msn#208, Flight_Id_Int#209, timestamp#10081 DESC NULLS LAST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS rank#10238], [msn#208, Flight_Id_Int#209], [timestamp#10081 DESC NULLS LAST]
+- Union
:- Project [msn#208, Flight_Id_Int#209, cast((cast(timestamp#212L as double) / 1.0E9) as timestamp) AS timestamp#10081, 3566000_421#214]

这是代码的一部分,我从存储数据的数据湖中提取数据。仅供参考,这是通过使用名为FoundryTS的库的自定义API完成的。重要的是,在我调用to_dataframe()方法之前,不会收集任何数据。我在每个msn上循环以避免调用过大,然后我将所有数据帧与unionByName合并在一起

# Loop over MSN to extract timeseries
df = []
for msn in msn_range:
search_results = (SeriesMetadata.M_REPORT_NUMBER == report_number) & (SeriesMetadata.M_AIRCRAFT == msn)
# Create the intervals to split TimeSeries extract by flight for each MSN
Start_int = list(df1.where(F.col("msn") == msn).select("Start").toPandas()["Start"])
End_int = list(df1.where(F.col("msn") == msn).select("End").toPandas()["End"])
flight_id = list(df1.where(F.col("msn") == msn).select("id_cmsReport").toPandas()["id_cmsReport"])
flights_interval = [Interval(
start, end, name=flight_Id
) for start, end, flight_Id in zip(
Start_int, End_int, flight_id
)]
""" Collect all the series in a node collections """
output = fts.search.series(
search_results,
object_types=["export-control-us-ear99-a220-dal-airline-series"])
.map_by(FF.interpolate(
before='nearest',
internal='nearest',
after='nearest',
frequency=frequency,
rename_columns_by=lambda x: x.metadata["parameter_id"] + "_" + x.metadata["report_number"]),
keys='msn') 
.map_intervals(flights_interval, interval_name='Flight_Id_Int')
.map(FF.time_range(period_start, period_end))
.to_dataframe()  # !!!!  numPartitions=32  Foundry Doc : #partition = #executors see if it triggers OOM error
df.append(output)
output = df[0]
for df in df[1:]:
output = output.unionByName(df)  # Same as union but matches name instead of columns order.
# Repartition by msn to improve latter calculation
N = len(msn_range)
output.repartition(N, 'msn')

"运行作业的驱动程序在运行过程中崩溃、内存不足或无响应">

你需要解决的第一个问题是增加驱动程序(而不是执行程序)的内存。spark中的默认驱动程序内存通常太低,以至于在许多查询中都会崩溃。

"我的问题是在我的代码中我应该如何以及在哪里重新划分";

Spark已经完成了根据需要添加重新分区的工作。很可能您只会在执行的中途手动重新划分数据来创建额外的工作。一种潜在的优化是将数据存储在带区块的表中,但这只会潜在地删除第一个交换,并且只有当您的区块列与第一个交换的哈希分区完全匹配时。

"查看查询计划,我注意到我有300多个步骤";

你上面描述的不需要300步。这里好像有什么不对劲。您的优化逻辑计划是什么样子的?平均值和std应该只需要扫描->部分agg->交换->最终agg。在您提供的查询计划中,您似乎有意只查看最后1600个数据点,而不是最后30m。你是说要做一个窗口函数而不是一个简单的聚合(又称分组)吗?

编辑:

对于msn_range中的msn:

IMO这可能是您问题的一部分。这个for循环会导致执行计划非常大,这可能就是您在驱动程序上遇到OOM问题的原因。你也许可以把它翻译成更适合火花的东西,而不需要在驱动程序上将forloop转换成火花上做那么多工作。parallelize(…).map(/你的代码/)

对于那些可能有所帮助的人,

以下是我在分区中出错的地方:

  1. .to_dataframe():默认情况下,在我们的云平台Spark中创建200个分区。因此,通过在40msn上循环,我生成了40x200分区。我最后有许多小任务要处理
  2. .repartition():由于我在msn上使用WindowpartitionBy,因此使用msn进行重新分区将加快这一步骤。但它引入了我的分区的完全洗牌

结果:根据Spark Job Tracker和>55000项任务。任务占用了一些开销,这可以解释司机撞车的原因。

我做了什么让它工作:

  1. 我去掉了Window函数

在从DataLake获取数据之前,在流程的早期进行过滤。我直接提取了我需要的那部分航班。因此,物理计划中完全相同部分的Exchange Partition更少。

这是最新的物理计划:

AdaptiveSparkPlan(isFinalPlan=false)
+- CollectLimit 1
+- HashAggregate(keys=[], functions=[avg(3565000_421#213), stddev_samp(3565000_421#213)], output=[avg(3565000_421)#10246, stddev_samp(3565000_421)#10255])
+- ShuffleQueryStage 0
+- Exchange SinglePartition, true
+- *(43) HashAggregate(keys=[], functions=[partial_avg(3565000_421#213), partial_stddev_samp(3565000_421#213)], output=[sum#10317, count#10318L, n#10261, avg#10262, m2#10263])
+- Union
:- *(1) Project [3565000_421#213]
:  +- *(1) Scan ExistingRDD[msn#208,Flight_Id_Int#209,Flight_Id_Int.start#210L
  1. 我减少了分区数量:

我在.to_dataframe()调用中将其任意设置为5,用于40个msn中的每一个。

24小时后构建成功。1.1MB的无序写入和>27任务。

正如@Andrew Long所指出的,由于for循环,它可能是次优的。我仍在为32个执行器生成至少200个分区,结果是>27k要管理的任务。

最后:

作为最后一步,我通过依赖底层API来获取1个大数据帧中的数据,并将分区强制为32,从而摆脱了for循环。截至本文撰写之时,Build仍在运行,我将根据结果编辑帖子。但要管理的任务确实要少得多(乘以因子4)。

编辑1-更新

很高兴地报告,通过消除for循环并将数据帧划分为64个分区(32个执行器x 2个内核),我只需11小时(而不是24小时)就可以完成相同的工作,只需1.9MB的Shuffle写入和5k个任务。

附言:我在上面提到了32个(而不是64个)分区,但32个分区的工作没有成功,并且具有次优并行性(<20),所以它运行的时间更长,我有空闲的执行器。64对我来说似乎是个好地方。

最新更新