我有一个数据帧,其中包含字符串类型的A列和B列。让我们假设以下数据帧
+--------+
|A | B |
|1a | 1b |
|2a | 2b |
我想添加第三列来创建A和B列的地图
+-------------------------+
|A | B | C |
|1a | 1b | {A->1a, B->1b} |
|2a | 2b | {A->2a, B->2b} |
我正在尝试通过以下方式执行此操作。我有 udf 接收数据帧并返回地图
val test = udf((dataFrame: DataFrame) => {
val result = new mutable.HashMap[String, String]
dataFrame.columns.foreach(col => {
result.put(col, dataFrame(col).asInstanceOf[String])
})
result
})
我以以下方式调用此 udf,当我尝试将数据集作为文字传递时,它会抛出 RunTimeException
。df.withColumn("C", Helper.test(lit(df.select(df.columns.head, df.columns.tail: _*)))
我不想将 df('a'( df('b'( 传递给我的助手 udf,因为我希望它们是我可以选择的列的通用列表。有什么指示吗?
映射方式
您可以将map
内置函数用作
import org.apache.spark.sql.functions._
val columns = df.columns
df.withColumn("C", map(columns.flatMap(x => Array(lit(x), col(x))): _*)).show(false)
应该给你
+---+---+---------------------+
|A |B |C |
+---+---+---------------------+
|1a |1b |Map(A -> 1a, B -> 1b)|
|2a |2b |Map(A -> 2a, B -> 2b)|
+---+---+---------------------+
乌德夫方式
或者,您可以使用将udf
定义为
//collecting column names to be used in the udf
val columns = df.columns
//definining udf function
import org.apache.spark.sql.functions._
def createMapUdf = udf((names: Seq[String], values: Seq[String])=> names.zip(values).toMap)
//calling udf function
df.withColumn("C", createMapUdf(array(columns.map(x => lit(x)): _*), array(col("A"), col("B")))).show(false)
我希望答案对您有所帮助
Ramesh Maharjan - 你的答案已经很棒了,我的答案只是使用字符串插值以动态方式制作你的 UDF 答案。
列D
以动态方式提供。
df.withColumn("C", createMapUdf(array(columns.map(x => lit(x)): _*),
array(col("A"), col("B"))))
.withColumn("D", createMapUdf(array(columns.map(x => lit(x)): _*),
array(columns.map(x => col(s"$x") ): _* ))).show()