我有一个示例数据帧:
df = spark.createDataFrame([
(1, 0, 1, 1, 1, 1, "something"),
(2, 0, 1, 1, 1, 0, "something"),
(3, 1, 0, 0, 0, 0, "something"),
(4, 0, 1, 0, 0, 0, "something"),
(5, 1, 0, 0, 0, 0, "something"),
(6, 0, 0, 0, 0, 0, "something")
], ["int" * 6, "string"])
.toDF("id", "a", "b", "c", "d", "e", "extra_column")
df.show()
+---+---+---+---+---+---+------------+
| id| a| b| c| d| e|extra_column|
+---+---+---+---+---+---+------------+
| 1| 0| 1| 1| 1| 1| something|
| 2| 0| 1| 1| 1| 0| something|
| 3| 1| 0| 0| 0| 0| something|
| 4| 0| 1| 0| 0| 0| something|
| 5| 1| 0| 0| 0| 0| something|
| 6| 0| 0| 0| 0| 0| something|
我想将每行的列连接起来,并在列= 1
处生成一个键。我不需要显示这个结果,但这是我需要解决的中间步骤:
df_row_concat = spark.createDataFrame([
(1, 0, 1, 1, 1, 1, "something", "bcde"),
(2, 0, 1, 1, 1, 0, "something", "bcd"),
(3, 1, 0, 0, 0, 0, "something", "a"),
(4, 0, 1, 0, 0, 0, "something", "b"),
(5, 1, 0, 0, 0, 0, "something", "a"),
(6, 0, 0, 0, 0, 0, "something", "")
], ["int" * 6, "string" * 2])
.toDF("id", "a", "b", "c", "d", "e", "extra_column", "key")
df_row_concat.show()
+---+---+---+---+---+---+------------+----+
| id| a| b| c| d| e|extra_column| key|
+---+---+---+---+---+---+------------+----+
| 1| 0| 1| 1| 1| 1| something|bcde|
| 2| 0| 1| 1| 1| 0| something| bcd|
| 3| 1| 0| 0| 0| 0| something| a|
| 4| 0| 1| 0| 0| 0| something| b|
| 5| 1| 0| 0| 0| 0| something| a|
| 6| 0| 0| 0| 0| 0| something| |
+---+---+---+---+---+---+------------+----+
最后一部分我可以自己完成,但为了完成这个例子,我想对键值和输出进行求和:
+----+-----+
| key|value|
+----+-----+
| a| 2|
| b| 1|
| bcd| 1|
|bcde| 1|
+----+-----+
我的实际数据集更长、更宽。我可以对每个组合进行硬编码,但必须有一种更有效的方法来循环遍历要考虑的列列表(例如column_list = ["a", "b", "c", "d", "e"]
(。也许没有必要,但我包含了extra_column
,因为我的数据集中还有其他列不会被考虑。。
我认为在这里编写for循环没有任何错误
from pyspark.sql import functions as F
cols = ['a', 'b', 'c', 'd', 'e']
temp = (df.withColumn('key', F.concat(*[F.when(F.col(c) == 1, c).otherwise('') for c in cols])))
+---+---+---+---+---+---+------------+----+
| id| a| b| c| d| e|extra_column| key|
+---+---+---+---+---+---+------------+----+
| 1| 0| 1| 1| 1| 1| something|bcde|
| 2| 0| 1| 1| 1| 0| something| bcd|
| 3| 1| 0| 0| 0| 0| something| a|
| 4| 0| 1| 0| 0| 0| something| b|
| 5| 1| 0| 0| 0| 0| something| a|
| 6| 0| 0| 0| 0| 0| something| |
+---+---+---+---+---+---+------------+----+
(temp
.groupBy('key')
.agg(F.count('*').alias('value'))
.where(F.col('key') != '')
.show()
)
+----+-----+
| key|value|
+----+-----+
|bcde| 1|
| b| 1|
| a| 2|
| bcd| 1|
+----+-----+