有条件地更新或添加多个Spark DataFrame列



我有一个DataFrame与以下模式和一些示例记录:

// df.printSchema
root
|-- CUST_NAME: string (nullable = true)
|-- DIRECTION: string (nullable = true)
|-- BANK_NAME: string (nullable = true)
|-- TXN_AMT: double (nullable = false)

// df.show(false)
+---------+---------+---------+-------+
|CUST_NAME|DIRECTION|BANK_NAME|TXN_AMT|
+---------+---------+---------+-------+
|ABC      |D        |Bank1    |300.0  |
|DEF      |C        |Bank2    |10.0   |
|GHI      |C        |Bank3    |12.0   |
|JKL      |D        |Bank4    |500.0  |
+---------+---------+---------+-------+

现在,根据方向列中的值,我需要有条件地添加两个新列:

  1. FROM_BANK
  2. TO_BANK

在简单的代码中,它看起来像这样:

var from_bank, to_bank
val direction = "D"
val bank_name = "Test"
direction match {
case "D" => {
from_bank = bank_name
to_bank = null
}
case "C" => {
from_bank = null
to_bank = bank_name
}
}

上面的代码只是我想要实现的一个解释,我知道这不是可以与Spark DataFrame一起工作的东西。

我知道我可以用多个when/else子句得到我想要的,如下所示:

val df2 = df.withColumn(
"FROM_BANK",
when($"DIRECTION" === "D", $"BANK_NAME")
.otherwise(lit(null))
)
.withColumn(
"TO_BANK",
when($"DIRECTION" === "C", $"BANK_NAME")
.otherwise(lit(null))
)
df2.show(100,false)
//    +---------+---------+---------+-------+---------+-------+
//    |CUST_NAME|DIRECTION|BANK_NAME|TXN_AMT|FROM_BANK|TO_BANK|
//    +---------+---------+---------+-------+---------+-------+
//    |ABC      |D        |Bank1    |300.0  |Bank1    |null   |
//    |DEF      |C        |Bank2    |10.0   |null     |Bank2  |
//    |GHI      |C        |Bank3    |12.0   |null     |Bank3  |
//    |JKL      |D        |Bank4    |500.0  |Bank4    |null   |
//    +---------+---------+---------+-------+---------+-------+

上面的方法看起来很简单,但它非常冗长,因为在现实中,我将需要为总共8个多列执行此操作。我考虑过的另一个选项是在DataFrame上使用.map函数,如下所示:

import spark.implicits._
val df3 = test_df.map(row => {
val direction = row.getAs[String]("Direction")
if (direction == "D")
(row.getAs[String]("CUST_NAME"),
row.getAs[String]("DIRECTION"),
row.getAs[String]("BANK_NAME"),
row.getAs[Double]("TXN_AMT"),
row.getAs[String]("BANK_NAME"), // This will become the FROM_BANK column
null // This will become to the TO_BANK column
)
else if (direction == "C")
(row.getAs[String]("CUST_NAME"),
row.getAs[String]("DIRECTION"),
row.getAs[String]("BANK_NAME"),
row.getAs[Double]("TXN_AMT"),
null, // This will become the FROM_BANK column
row.getAs[String]("BANK_NAME") // This will become to the TO_BANK column
)
}).toDF("CUST_NAME","DIRECTION","BANK_NAME","TXN_AMOUNT","FROM_BANK","TO_BANK")

然而,当运行上面的命令时,我得到了以下错误:

Error:(35, 26) Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.
val df3 = test_df.map(row => {

我尝试通过创建静态类型数据集来修改上述内容,但仍然存在相同的问题:

import spark.implicits._
case class Record(CUST_NAME: String, DIRECTION: String, BANK_NAME: String, TXN_AMT: Double)
val test_df4 = test_df.as[Record].map(row => {
val direction = row.DIRECTION
if (direction == "D")
(
row.CUST_NAME,
row.DIRECTION,
row.BANK_NAME,
row.TXN_AMT,
row.BANK_NAME, // This will become the FROM_BANK column
null // This will become to the TO_BANK column
)
else if (direction == "C")
(
row.CUST_NAME,
row.DIRECTION,
row.BANK_NAME,
row.TXN_AMT,
null, // This will become the FROM_BANK column
row.BANK_NAME // This will become to the TO_BANK column
)
}).toDF("CUST_NAME","DIRECTION","BANK_NAME","TXN_AMOUNT","FROM_BANK","TO_BANK")
test_df4.show(100,false)

我知道第一个选项可以工作,但我希望以更程序化的方式做到这一点,因为我需要为所有基于DIRECTION列值的多列做到这一点。如果有任何反馈或建议,我将不胜感激。

谢谢!

您可以将when语句放在列表中(或以编程方式将它们添加到列表中),然后选择它们。然后,您不需要链接一堆withColumn语句。还请注意,.otherwise(null)不是必需的,因为这是默认行为。

val newcols = List(
col("*"),
when($"DIRECTION" === "D", $"BANK_NAME").as("FROM_BANK"),
when($"DIRECTION" === "C", $"BANK_NAME").as("TO_BANK")
)
val df2 = df.select(newcols: _*)

最新更新