我正在尝试在数据框架中创建一个新列,该列只是现有列的改组版本。我能够使用如何在Spark DataFrame中进行冲洗中描述的方法在数据框架中随机订购行?进行洗牌。
import pyspark
import pyspark.sql.functions as F
spark = pyspark.sql.SparkSession.builder.getOrCreate()
df = spark.range(5).toDF("x")
df.show()
#> +---+
#> | x|
#> +---+
#> | 0|
#> | 1|
#> | 2|
#> | 3|
#> | 4|
#> +---+
# the rows appear to be shuffled
ordered_df = df.orderBy(F.rand())
ordered_df.show()
#> +---+
#> | x|
#> +---+
#> | 0|
#> | 2|
#> | 3|
#> | 4|
#> | 1|
#> +---+
# ...but when i try to add this column to the df, they are no longer shuffled
df.withColumn('y', ordered_df.x).show()
#> +---+---+
#> | x| y|
#> +---+---+
#> | 0| 0|
#> | 1| 1|
#> | 2| 2|
#> | 3| 3|
#> | 4| 4|
#> +---+---+
由reprexpy软件包在2019-06-28创建
一些注释:
- 我想找到一个解决方案,其中数据保留在火花中。例如,我不想使用需要将数据移出JVM的用户定义功能。
- pyspark中的解决方案:数据框中的随机行对我不起作用(请参见下文(。
df = spark.sparkContext.parallelize(range(5)).map(lambda x: (x, )).toDF(["x"])
df.withColumn('y', df.orderBy(F.rand()).x).show()
#> +---+---+
#> | x| y|
#> +---+---+
#> | 0| 0|
#> | 1| 1|
#> | 2| 2|
#> | 3| 3|
#> | 4| 4|
#> +---+---+
- 我必须在许多列中洗净行,并且每列必须独立于其他列进行洗牌。因此,我不想在https://stackoverflow.com/a/45889539中使用
zipWithIndex()
解决方案,因为该解决方案将要求我在数据上运行许多加入(我假设这将是时间密集型(。
您可以使用窗口函数来完成此操作,以分配每行的随机索引,在单独的DF中再次进行此操作,然后在索引上加入:
>>> from pyspark.sql.window import Window
>>> import pyspark.sql.functions as F
>>> df = spark.range(5).toDF("x")
>>> left = df.withColumn("rnd", F.row_number().over(Window.orderBy(F.rand())))
>>> right = df.withColumnRenamed("x", "y").withColumn("rnd", F.row_number().over(Window.orderBy(F.rand())))
>>> dff = left.join(right, left.rnd == right.rnd).drop("rnd")
>>> dff.show()
19/06/29 13:17:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
19/06/29 13:17:04 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
+---+---+
| x| y|
+---+---+
| 3| 3|
| 2| 0|
| 0| 2|
| 1| 1|
| 4| 4|
+---+---+
正如警告所暗示的那样,这在实践中可能不是一个好主意。