添加dictionary键作为列名,添加dictionaryvalue作为Pyspark-df中该列的常数值



我有一个字典x = {'colA': 20, 'colB': 30}和一个pyspark df。

ID Value
1  ABC
1  BCD
1  AKB
2  CAB
2  AIK
3  KIB 

我想用x创建df1,如下所示:

ID Value colA colB
1  ABC    20.0  30.0
1  BCD    20.0  30.0
1  AKB    20.0  30.0
2  CAB    20.0  30.0
...

知道怎么做吗Pyspark。我知道我可以创建这样的常量列,

df1 = df.withColumn('colA', lit(20.0))
df1 = df1.withColumn('colB', lit(30.0))

但不确定字典的动态过程

有一些方法可以隐藏循环,但执行是一样的。例如,您可以使用select:

from pyspark.sql.functions import lit
df2 = df.select("*", *[lit(val).alias(key) for key, val in x.items()])
df2.show()
#+---+-----+----+----+
#| ID|Value|colB|colA|
#+---+-----+----+----+
#|  1|  ABC|  30|  20|
#|  1|  BCD|  30|  20|
#|  1|  AKB|  30|  20|
#|  2|  CAB|  30|  20|
#|  2|  AIK|  30|  20|
#|  3|  KIB|  30|  20|
#+---+-----+----+----+

functools.reducewithColumn:

from functools import reduce
df3 = reduce(lambda df, key: df.withColumn(key, lit(x[key])), x, df)
df3.show()
# Same as above

或者pyspark.sql.functions.structselect()"*"语法:

from pyspark.sql.functions import struct
df4 = df.withColumn('x', struct([lit(val).alias(key) for key, val in x.items()]))
.select("ID", "Value", "x.*")
df4.show()
#Same as above

但如果你看看这些方法的执行计划,你会发现它们完全相同:

df2.explain()
#== Physical Plan ==
#*Project [ID#44L, Value#45, 30 AS colB#151, 20 AS colA#152]
#+- Scan ExistingRDD[ID#44L,Value#45]
df3.explain()
#== Physical Plan ==
#*Project [ID#44L, Value#45, 30 AS colB#102, 20 AS colA#107]
#+- Scan ExistingRDD[ID#44L,Value#45]
df4.explain()
#== Physical Plan ==
#*Project [ID#44L, Value#45, 30 AS colB#120, 20 AS colA#121]
#+- Scan ExistingRDD[ID#44L,Value#45]

此外,如果你比较@anil的答案中的循环方法:

df1 = df  
for key in x:
df1 = df1.withColumn(key, lit(x[key]))
df1.explain()
#== Physical Plan ==
#*Project [ID#44L, Value#45, 30 AS colB#127, 20 AS colA#132]
#+- Scan ExistingRDD[ID#44L,Value#45]

你会发现这也是一样的。

按以下循环遍历字典

df1 = df  
for key in x:
df1 = df1.withColumn(key, lit(x[key]))

最新更新