当我使用 window.partitionBy() 函数与 spark/scala 一起使用时,如何保持分区编号不变?



我有一个RDD,当我使用window时,结果的RDD' partition更改为200,当我使用window时,我不能更改partition

这是我的代码:

val rdd= sc.parallelize(List(1,3,2,4,5,6,7,8),4)
val result = rdd.toDF("values").withColumn("csum", sum(col("values")).over(Window.partitionBy(col("values")))).rdd
println(result.getNumPartitions + "rdd2")

我的输入分区是4,为什么结果分区为200?

我希望我的结果分区也为4。

是否有更干净的解决方案?

注意:正如@eliasah所述 - 不可能避免 使用窗口函数与Spark

使用时的重新分配

  • 为什么结果分区为200?

spark docspark.sql.shuffle.partitions的默认值配置用于加入或聚合的数据时要使用的分区数 - 为200

  • 我如何重新分配4?

您可以使用:

coalesce(4)

repartition(4)

Spark Doc

cocece(NumPartitions(将RDD中的分区数减少到Numpartition。过滤大型数据集后,可用于更有效地运行操作。

repartition(NumPartitions(对RDD中的数据进行重新封装以随机创建更多或更少的分区,并在它们上平衡它们。这总是通过网络上的所有数据调整。

(也将此答案添加到https://stackoverflow.com/a/a/444384638/3415409(

我只是在阅读有关使用GroupBy聚合时控制的分区数量,摘自https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql/spark-sql-performance-tuning-tuning-tuning-tuning-groupby-aggregation.html,IT似乎可以与窗口一起使用同样的技巧,在我的代码中,我正在定义一个

之类的窗口
windowSpec = Window 
    .partitionBy('colA', 'colB') 
    .orderBy('timeCol') 
    .rowsBetween(1, 1)

然后做

next_event = F.lead('timeCol', 1).over(windowSpec)

并通过

创建数据框
df2 = df.withColumn('next_event', next_event)

确实,它有200个分区。但是,如果我做

df2 = df.repartition(10, 'colA', 'colB').withColumn('next_event', next_event)

它有10!

最新更新