假设我有以下数据框架。
import pyspark.sql.functions as f
from pyspark.sql.window import Window
l =[( 9 , 1, 'A' ),
( 9 , 2, 'B' ),
( 9 , 3, 'C' ),
( 9 , 4, 'D' ),
( 10 , 1, 'A' ),
( 10 , 2, 'B' )]
df = spark.createDataFrame(l, ['prod','rank', 'value'])
df.show()
+----+----+-----+
|prod|rank|value|
+----+----+-----+
| 9| 1| A|
| 9| 2| B|
| 9| 3| C|
| 9| 4| D|
| 10| 1| A|
| 10| 2| B|
+----+----+-----+
如何使用基于rank
的value
列的值创建一个新框架?
所需的输出:
l =[( 9 , ['A','B','C','D'] ),
( 10 , ['A','B'])]
l = spark.createDataFrame(l, ['prod', 'conc'])
+----+------------+
|prod| conc|
+----+------------+
| 9|[A, B, C, D]|
| 10| [A, B]|
+----+------------+
df = df.orderBy(["prod", "rank"], ascending=[1, 1])
df = df.rdd.map(lambda r: (r.prod, r.value)).reduceByKey(lambda x,y: list(x) + list(y)).toDF(['prod','conc'])
df.show()
+----+------------+
|prod| conc|
+----+------------+
| 9|[A, B, C, D]|
| 10| [A, B]|
+----+------------+
这是基于您指定的快速解决方案。希望它有帮助
w = Window.partitionBy('prod').orderBy('rank')
desiredDF = df.withColumn('values_list', f.collect_list('value').over(w)).groupBy('prod').agg(f.max('values_list').alias('conc'))
desiredDF.show()
+----+------------+
|prod| conc|
+----+------------+
| 9|[A, B, C, D]|
| 10| [A, B]|
+----+------------+