如何通过聚合来提高执行时间以计算spark中的百分位数



我正在尝试设置一个pyspark作业,该作业估计每天约700GB数据量的p25、p50、p75和p90。我用40个工作节点运行这个作业,每个工作节点有32G内存和8vCPU,但最终要运行大约15个小时才能完成。我假设延迟是由于需要在节点之间对值进行排序以计算百分位数。有没有一种替代方案可以加快这一进程?

输入数据模式-

root
|-- processed_date: date (nullable = true)
|-- id: string (nullable = true)
|-- experiment: string (nullable = true)
|-- type: string (nullable = true)
|-- value: double (nullable = true)
|-- revision: string (nullable = true)
|-- source: string (nullable = true)
|-- region: string (nullable = true)
df_agg = df.groupby('processed_date', 'id', 'experiment', 'type').agg(
F.min('value').alias('min'),
F.max('value').alias('max'),
F.avg('value').alias('avg'),
F.expr('percentile(value, 0.25)').alias('p25'),
F.expr('percentile(value, 0.50)').alias('p50'),
F.expr('percentile(value, 0.75)').alias('p75'),
F.expr('percentile(value, 0.90)').alias('p90'))

谢谢!

只使用列来重新分区,意味着它使用spark.sql.shuffle.partitions对表达式中使用的列使用哈希分区器,因此在默认的shuffle分区不够的情况下,这将无法正常工作。(默认为200(

u应设置numPartitions as well as column expressions。对于这种情况,我想做这样的事:

df=df.repartition(1000, *['processed_date', 'id', 'experiment', 'type']) 

或者在应用重新分区(仅使用列(之前,设置shuffle分区:

spark.conf.set("spark.sql.shuffle.partitions",1000)
df=df.repartition(*['processed_date', 'id', 'experiment', 'type'])`

我建议您在应用groupby之前重新分区并溢出到磁盘,以便使用adequate partitioning and in-memory computing(确保单程(:

使用溢出到磁盘的数据仍然比根本不放入内存更快

from pyspark.storagelevel import StorageLevel
df=df.repartition(1000, *['processed_date', 'id', 'experiment', 'type'])
.persist(StorageLevel.MEMORY_AND_DISK)

NumPartitionsworkers * cores * (2 or 3)计算(因为几乎所有现代虚拟内核都是多线程的(,得出8*40*3=960,我将其四舍五入为1000

您可以尝试通过对列上的数据帧DataFrame.repartition进行重新分区

df = df.repartition('processed_date', 'id', 'experiment', 'type')

这样,与上述列的组合相关的所有记录都将位于同一节点中。

最新更新