如何使用groupby和aggregate将pyspark数据帧中的行与多列连接起来



我有一个包含多列的pyspark数据帧。例如下面的一个。

from pyspark.sql import Row
l = [('Jack',"a","p"),('Jack',"b","q"),('Bell',"c","r"),('Bell',"d","s")]
rdd = sc.parallelize(l)
score_rdd = rdd.map(lambda x: Row(name=x[0], letters1=x[1], letters2=x[2]))
score_card = sqlContext.createDataFrame(score_rdd)
+----+--------+--------+
|name|letters1|letters2|
+----+--------+--------+
|Jack|       a|       p|
|Jack|       b|       q|
|Bell|       c|       r|
|Bell|       d|       s|
+----+--------+--------+

现在我想按";name";并将两列的每一行中的值连接起来。我知道如何做到这一点,但假设有数千行,那么我的代码就会变得非常难看。这是我的解决方案。

import pyspark.sql.functions as f
t = score_card.groupby("name").agg(
f.concat_ws("",collect_list("letters1").alias("letters1")),
f.concat_ws("",collect_list("letters2").alias("letters2"))
)

这是我将其保存在CSV文件中时得到的输出。

+----+--------+--------+
|name|letters1|letters2|
+----+--------+--------+
|Jack|      ab|      pq|
|Bell|      cd|      rs|
+----+--------+--------+

但我主要关心的是的这两行代码

f.concat_ws("",collect_list("letters1").alias("letters1")),
f.concat_ws("",collect_list("letters2").alias("letters2"))

如果有数千列,那么我将不得不重复上面的代码数千次。有没有一个更简单的解决方案,这样我就不必对每一列重复f.concat_ws((了?

我到处找都找不到解决办法。

是的,您可以在agg函数内部使用for循环,并在df.columns中迭代。如果有帮助,请告诉我。

from pyspark.sql import functions as F
df.show()
# +--------+--------+----+
# |letters1|letters2|name|
# +--------+--------+----+
# |       a|       p|Jack|
# |       b|       q|Jack|
# |       c|       r|Bell|
# |       d|       s|Bell|
# +--------+--------+----+
df.groupBy("name").agg( *[F.array_join(F.collect_list(column), "").alias(column) for column in df.columns if column !='name' ]).show()
# +----+--------+--------+
# |name|letters1|letters2|
# +----+--------+--------+
# |Bell|      cd|      rs|
# |Jack|      ab|      pq|
# +----+--------+--------+

最新更新