我有一个RDD
,当我使用window
时,结果的RDD' partition
更改为200,当我使用window
时,我不能更改partition
?
这是我的代码:
val rdd= sc.parallelize(List(1,3,2,4,5,6,7,8),4)
val result = rdd.toDF("values").withColumn("csum", sum(col("values")).over(Window.partitionBy(col("values")))).rdd
println(result.getNumPartitions + "rdd2")
我的输入分区是4,为什么结果分区为200?
我希望我的结果分区也为4。
是否有更干净的解决方案?
注意:正如@eliasah所述 - 不可能避免 使用窗口函数与Spark
使用时的重新分配
- 为什么结果分区为200?
spark docspark.sql.shuffle.partitions
的默认值配置用于加入或聚合的数据时要使用的分区数 - 为200
- 我如何重新分配4?
您可以使用:
coalesce(4)
或
repartition(4)
Spark Doc
cocece(NumPartitions(将RDD中的分区数减少到Numpartition。过滤大型数据集后,可用于更有效地运行操作。
repartition(NumPartitions(对RDD中的数据进行重新封装以随机创建更多或更少的分区,并在它们上平衡它们。这总是通过网络上的所有数据调整。
(也将此答案添加到https://stackoverflow.com/a/a/444384638/3415409(
我只是在阅读有关使用GroupBy聚合时控制的分区数量,摘自https://jaceklaskowski.gitbooks.io/mastering-spark-sql/spark-sql/spark-sql-performance-tuning-tuning-tuning-tuning-groupby-aggregation.html,IT似乎可以与窗口一起使用同样的技巧,在我的代码中,我正在定义一个
之类的窗口windowSpec = Window
.partitionBy('colA', 'colB')
.orderBy('timeCol')
.rowsBetween(1, 1)
然后做
next_event = F.lead('timeCol', 1).over(windowSpec)
并通过
创建数据框df2 = df.withColumn('next_event', next_event)
确实,它有200个分区。但是,如果我做
df2 = df.repartition(10, 'colA', 'colB').withColumn('next_event', next_event)
它有10!