PySpark-向Dictionary中添加一个复合密钥名称



如果有任何错误,请告诉我,因为这是我的第一篇文章。

这是数据帧df:列"a"是一个字符串,其余部分是浮动的。

我为数据帧添加了一个图像,因为当我手动添加数据时,格式会变得一团糟。

数据帧

在给定的dataFrame-df上,我想按列"a"分组,并找到其他列的最小值和最大值。我想把输出作为字典。因此,我将生成的pyspark数据帧转换为json,并使用json.loads将其转换为Dictionary。

Code snippet:
import pyspark.sql.functions as F
cols=['b','c']
req_cols=[F.struct(F.first('a').alias('a'),F.max(col).alias('max'),F.min(col).lias('min')).alias(col) for col in cols]
df_cache=df.groupby('a').agg(*req_cols).cache()
dict=json.loads(df_cache.toJSON.collect()[0])

我的输出:

{
"b": {
"max":
"min":
"a":'10'
},
"c": {
"max":
"min":
"a":'10'
},
}

所需输出:

{
"b_10": {
"max":
"min":
"a":'10'
},
"c_10": {
"max":
"min":
"a":'10'
},
"b_20": {
"max":
"min":
"a":'20'
},
"c_20": {
"max":
"min":
"a":'20'
},
"b_30": {
"max":
"min":
"a":'30'
},
"c_30": {
"max":
"min":
"a":'30'
},
}

输出

分组时使用pivot

df_cache = df.groupBy().pivot('a').agg(*req_cols).cache()

列名将与您想要的输出不同,所以如果您想要,您需要更改它们

最新更新