按值和和连接PySpark数据帧列名

  • 本文关键字:数据帧 连接 PySpark pyspark
  • 更新时间 :
  • 英文 :


我有一个示例数据帧:

df = spark.createDataFrame([
(1, 0, 1, 1, 1, 1, "something"),
(2, 0, 1, 1, 1, 0, "something"),
(3, 1, 0, 0, 0, 0, "something"),
(4, 0, 1, 0, 0, 0, "something"),
(5, 1, 0, 0, 0, 0, "something"),
(6, 0, 0, 0, 0, 0, "something")
], ["int" * 6, "string"]) 
.toDF("id", "a", "b", "c", "d", "e", "extra_column")
df.show()
+---+---+---+---+---+---+------------+
| id|  a|  b|  c|  d|  e|extra_column|
+---+---+---+---+---+---+------------+
|  1|  0|  1|  1|  1|  1|   something|
|  2|  0|  1|  1|  1|  0|   something|
|  3|  1|  0|  0|  0|  0|   something|
|  4|  0|  1|  0|  0|  0|   something|
|  5|  1|  0|  0|  0|  0|   something|
|  6|  0|  0|  0|  0|  0|   something|

我想将每行的列连接起来,并在列= 1处生成一个键。我不需要显示这个结果,但这是我需要解决的中间步骤:

df_row_concat = spark.createDataFrame([
(1, 0, 1, 1, 1, 1, "something", "bcde"),
(2, 0, 1, 1, 1, 0, "something", "bcd"),
(3, 1, 0, 0, 0, 0, "something", "a"),
(4, 0, 1, 0, 0, 0, "something", "b"),
(5, 1, 0, 0, 0, 0, "something", "a"),
(6, 0, 0, 0, 0, 0, "something", "")
], ["int" * 6, "string" * 2]) 
.toDF("id", "a", "b", "c", "d", "e", "extra_column", "key")
df_row_concat.show()
+---+---+---+---+---+---+------------+----+
| id|  a|  b|  c|  d|  e|extra_column| key|
+---+---+---+---+---+---+------------+----+
|  1|  0|  1|  1|  1|  1|   something|bcde|
|  2|  0|  1|  1|  1|  0|   something| bcd|
|  3|  1|  0|  0|  0|  0|   something|   a|
|  4|  0|  1|  0|  0|  0|   something|   b|
|  5|  1|  0|  0|  0|  0|   something|   a|
|  6|  0|  0|  0|  0|  0|   something|    |
+---+---+---+---+---+---+------------+----+

最后一部分我可以自己完成,但为了完成这个例子,我想对键值和输出进行求和:

+----+-----+
| key|value|
+----+-----+
|   a|    2|
|   b|    1|
| bcd|    1|
|bcde|    1|
+----+-----+

我的实际数据集更长、更宽。我可以对每个组合进行硬编码,但必须有一种更有效的方法来循环遍历要考虑的列列表(例如column_list = ["a", "b", "c", "d", "e"](。也许没有必要,但我包含了extra_column,因为我的数据集中还有其他列不会被考虑。。

我认为在这里编写for循环没有任何错误

from pyspark.sql import functions as F
cols = ['a', 'b', 'c', 'd', 'e']
temp = (df.withColumn('key', F.concat(*[F.when(F.col(c) == 1, c).otherwise('') for c in cols])))
+---+---+---+---+---+---+------------+----+
| id|  a|  b|  c|  d|  e|extra_column| key|
+---+---+---+---+---+---+------------+----+
|  1|  0|  1|  1|  1|  1|   something|bcde|
|  2|  0|  1|  1|  1|  0|   something| bcd|
|  3|  1|  0|  0|  0|  0|   something|   a|
|  4|  0|  1|  0|  0|  0|   something|   b|
|  5|  1|  0|  0|  0|  0|   something|   a|
|  6|  0|  0|  0|  0|  0|   something|    |
+---+---+---+---+---+---+------------+----+
(temp
.groupBy('key')
.agg(F.count('*').alias('value'))
.where(F.col('key') != '')
.show()
)
+----+-----+
| key|value|
+----+-----+
|bcde|    1|
|   b|    1|
|   a|    2|
| bcd|    1|
+----+-----+

最新更新