我有一个这样的列:
列 |
---|
[{quot;键":1,"值"aaaaa"},{quotquot;密钥":2,"值":"bbbbb"},{"密钥";:3,"值":"ccccccc"}] |
[{"密钥":1,"值":"abcde"},{"键":2,"值":"bcdef"}] |
[{"键":1,"值":"edcba"},{"密钥":3,"值":"zxcvb"} |
解析到结构数组中,然后旋转key
列。您需要一些ID
列作为数据透视的分组依据,在内联结构数组之前,我使用monotonically_increasing_id
函数添加了一个id
。
from pyspark.sql import functions as F
df = spark.createDataFrame([
('[{"key":1,"value":"aaaaa"},{"key":2,"value":"bbbbb"},{"key":3,"value":"ccccc"}]',),
('[{"key":1,"value":"abcde"},{"key":2,"value":"bcdef"}]',),
('[{"key":1,"value":"edcba"},{"key":3,"value":"zxcvb"},{"key":4,"value":"qwert"}]',)
], ["column"])
test = (df.withColumn("column", F.from_json("column", test_schema))
.withColumn("id", F.monotonically_increasing_id())
.selectExpr("id", "inline(column)")
.groupBy("id").pivot("key").agg(F.first("value"))
.drop("id")
)
test.show()
#+-----+-----+-----+-----+
#| 1| 2| 3| 4|
#+-----+-----+-----+-----+
#|aaaaa|bbbbb|ccccc| null|
#|abcde|bcdef| null| null|
#|edcba| null|zxcvb|qwert|
#+-----+-----+-----+-----+
然后,如果需要,您可以重命名列以添加前缀column_*
:
test = test.select(*[F.col(c).alias(f"column_{c}") for c in test.columns])