如何在Spark DataFrame中(同样)分区阵列数据



i具有以下形式的数据框架:

import scala.util.Random
val localData = (1 to 100).map(i => (i,Seq.fill(Math.abs(Random.nextGaussian()*100).toInt)(Random.nextDouble)))
val df = sc.parallelize(localData).toDF("id","data")
|-- id: integer (nullable = false)
|-- data: array (nullable = true)
|    |-- element: double (containsNull = false)

df.withColumn("data_size",size($"data")).show
+---+--------------------+---------+
| id|                data|data_size|
+---+--------------------+---------+
|  1|[0.77845301260182...|      217|
|  2|[0.28806915178410...|      202|
|  3|[0.76304121847720...|      165|
|  4|[0.57955190088558...|        9|
|  5|[0.82134215959459...|       11|
|  6|[0.42193739241567...|       57|
|  7|[0.76381645621403...|        4|
|  8|[0.56507523859466...|       93|
|  9|[0.83541853717244...|      107|
| 10|[0.77955626749231...|      111|
| 11|[0.83721643562080...|      223|
| 12|[0.30546029947285...|      116|
| 13|[0.02705462199952...|       46|
| 14|[0.46646815407673...|       41|
| 15|[0.66312488908446...|       16|
| 16|[0.72644646115640...|      166|
| 17|[0.32210572380128...|      197|
| 18|[0.66680355567329...|       61|
| 19|[0.87055594653295...|       55|
| 20|[0.96600507545438...|       89|
+---+--------------------+---------+

现在,我想应用一个昂贵的UDF,计算时间与数据阵列的大小成正比。我想知道如何重新分配我的数据,以使每个分区的记录数量近似相同*data_size;(即,数据点不仅是记录)。

如果只做df.repartition(100),我可能会得到一些分区,其中包含一些非常大的阵列,这些数组是整个火花阶段的瓶颈(所有其他TAK已经完成)。如果我只能选择大量的分区,这些分区将(几乎)确保每个记录都在单独的分区中。但是有其他方法吗?

正如您所说,您可以增加分区量。我通常使用核心数的倍数:spark上下文默认并行性 * 2-3 ..
就您而言,您可以使用更大的乘数。

另一个解决方案是以这种方式过滤DF:

  • DF只有更大的数组
  • 与剩下的df

然后,您可以重新分配每个,执行计算并将其汇总。

当心,由于您有大量的排空,因此重新分配可能很昂贵。

您可以看一下这些幻灯片(27 ):https://www.slideshare.net/sparksummit/custom-applications-with-sparks-sparks-rdd-sparks-smummit-summit-summit-summit-asmit-talk-talk-talk-talk-talk-talk-talk-tell-tejas-tejas-tejas--tejas--tejas--tejas--tejas--tejas--teg-patil

他们经历了非常糟糕的数据倾斜,不得不以有趣的方式处理。

最新更新