我创建了一个玩具火花数据帧:
import numpy as np
import pyspark
from pyspark.sql import functions as sf
from pyspark.sql import functions as F
# sc = pyspark.SparkContext()
# sqlc = pyspark.SQLContext(sc)
df = spark.createDataFrame([('csc123','sr1', 'tac1', 'abc'),
('csc123','sr2', 'tac1', 'abc'),
('csc234','sr3', 'tac2', 'bvd'),
('csc345','sr5', 'tac2', 'bvd')
],
['bug_id', 'sr_link', 'TAC_engineer','de_manager'])
df.show()
+------+-------+------------+----------+
|bug_id|sr_link|TAC_engineer|de_manager|
+------+-------+------------+----------+
|csc123| sr1| tac1| abc|
|csc123| sr2| tac1| abc|
|csc234| sr3| tac2| bvd|
|csc345| sr5| tac2| bvd|
+------+-------+------------+----------+
然后,我尝试为每个错误id 聚合并生成[sr_link, sr_link]
数组
#df = spark.createDataFrame([('row11','row12'), ('row21','row22')], ['colname1', 'colname2'])
df_drop_dup = df.select('bug_id', 'de_manager').dropDuplicates()
df = df.withColumn('joined_column',
sf.concat(sf.col('sr_link'),sf.lit(' '), sf.col('TAC_engineer')))
df_sev_arr = df.groupby("bug_id").agg(F.collect_set("joined_column")).withColumnRenamed("collect_set(joined_column)","sr_array")
df = df_drop_dup.join(df_sev_arr, on=['bug_id'], how='inner')
df.show()
这是输出:
+------+----------+--------------------+
|bug_id|de_manager| sr_array|
+------+----------+--------------------+
|csc345| bvd| [sr5 tac2]|
|csc123| abc|[sr2 tac1, sr1 tac1]|
|csc234| bvd| [sr3 tac2]|
+------+----------+--------------------+
但我真正期望的实际输出是:
+------+----------+----------------------------------------------------------------------+
|bug_id|de_manager| sr_array|
+------+----------+----------------------------------------------------------------------+
|csc345| bvd| [{sr_link: sr5, TAC_engineer:tac2}]|
|csc123| abc|[{sr_link: sr2, TAC_engineer:tac1},{sr_link: sr1, TAC_engineer: tac1}]|
|csc234| bvd| [{sr_link: sr3, TAC_engineer: tac2}]|
+------+----------+----------------------------------------------------------------------+
因为我希望最终输出可以保存为JSON格式,如:
'bug_id': 'csc123'
'de_manager': 'abc'
'sr_array':
'sr_link': 'sr2', 'TAC_engineer': 'tac1'
'sr_link': 'sr1', 'TAC_engineer': 'tac1'
有人能帮忙吗?对不起,我对Spark Dataframe中的MapType
非常陌生。
根据您的要求
第一部分将保持不变
from pyspark.sql import functions as F
# sc = pyspark.SparkContext()
# sqlc = pyspark.SQLContext(sc)
df = spark.createDataFrame([('csc123','sr1', 'tac1', 'abc'),
('csc123','sr2', 'tac1', 'abc'),
('csc234','sr3', 'tac2', 'bvd'),
('csc345','sr5', 'tac2', 'bvd')
],
['bug_id', 'sr_link', 'TAC_engineer','de_manager'])
df.show()
我刚刚修改了第二部分
>>> df_drop_dup = df.select('bug_id', 'de_manager').dropDuplicates()
修改了WithcolumnRenamed to Alias
的重命名函数,并添加了to_json and Struct
函数以获得所需的输出,同时对数据帧命名df-->df1 进行了少量修改
>>> df1 = df.withColumn('joined_column', F.to_json(F.struct(F.col('sr_link'), F.col('TAC_engineer'))))
>>> df_sev_arr = df1.groupby("bug_id").agg(F.collect_set("joined_column").alias("sr_array"))
>>> df = df_drop_dup.join(df_sev_arr, on=['bug_id'], how='inner')
>>> df.show(truncate=False)
+------+----------+----------------------------------------------------------------------------------+
|bug_id|de_manager|sr_array |
+------+----------+----------------------------------------------------------------------------------+
|csc345|bvd |[{"sr_link":"sr5","TAC_engineer":"tac2"}] |
|csc123|abc |[{"sr_link":"sr1","TAC_engineer":"tac1"}, {"sr_link":"sr2","TAC_engineer":"tac1"}]|
|csc234|bvd |[{"sr_link":"sr3","TAC_engineer":"tac2"}] |
+------+----------+----------------------------------------------------------------------------------+
如果你有任何与此相关的问题,请告诉我。