我有一个DataFrame与以下模式和一些示例记录:
// df.printSchema
root
|-- CUST_NAME: string (nullable = true)
|-- DIRECTION: string (nullable = true)
|-- BANK_NAME: string (nullable = true)
|-- TXN_AMT: double (nullable = false)
// df.show(false)
+---------+---------+---------+-------+
|CUST_NAME|DIRECTION|BANK_NAME|TXN_AMT|
+---------+---------+---------+-------+
|ABC |D |Bank1 |300.0 |
|DEF |C |Bank2 |10.0 |
|GHI |C |Bank3 |12.0 |
|JKL |D |Bank4 |500.0 |
+---------+---------+---------+-------+
现在,根据方向列中的值,我需要有条件地添加两个新列:
- FROM_BANK
- TO_BANK
在简单的代码中,它看起来像这样:
var from_bank, to_bank
val direction = "D"
val bank_name = "Test"
direction match {
case "D" => {
from_bank = bank_name
to_bank = null
}
case "C" => {
from_bank = null
to_bank = bank_name
}
}
上面的代码只是我想要实现的一个解释,我知道这不是可以与Spark DataFrame一起工作的东西。
我知道我可以用多个when/else子句得到我想要的,如下所示:
val df2 = df.withColumn(
"FROM_BANK",
when($"DIRECTION" === "D", $"BANK_NAME")
.otherwise(lit(null))
)
.withColumn(
"TO_BANK",
when($"DIRECTION" === "C", $"BANK_NAME")
.otherwise(lit(null))
)
df2.show(100,false)
// +---------+---------+---------+-------+---------+-------+
// |CUST_NAME|DIRECTION|BANK_NAME|TXN_AMT|FROM_BANK|TO_BANK|
// +---------+---------+---------+-------+---------+-------+
// |ABC |D |Bank1 |300.0 |Bank1 |null |
// |DEF |C |Bank2 |10.0 |null |Bank2 |
// |GHI |C |Bank3 |12.0 |null |Bank3 |
// |JKL |D |Bank4 |500.0 |Bank4 |null |
// +---------+---------+---------+-------+---------+-------+
上面的方法看起来很简单,但它非常冗长,因为在现实中,我将需要为总共8个多列执行此操作。我考虑过的另一个选项是在DataFrame上使用.map
函数,如下所示:
import spark.implicits._
val df3 = test_df.map(row => {
val direction = row.getAs[String]("Direction")
if (direction == "D")
(row.getAs[String]("CUST_NAME"),
row.getAs[String]("DIRECTION"),
row.getAs[String]("BANK_NAME"),
row.getAs[Double]("TXN_AMT"),
row.getAs[String]("BANK_NAME"), // This will become the FROM_BANK column
null // This will become to the TO_BANK column
)
else if (direction == "C")
(row.getAs[String]("CUST_NAME"),
row.getAs[String]("DIRECTION"),
row.getAs[String]("BANK_NAME"),
row.getAs[Double]("TXN_AMT"),
null, // This will become the FROM_BANK column
row.getAs[String]("BANK_NAME") // This will become to the TO_BANK column
)
}).toDF("CUST_NAME","DIRECTION","BANK_NAME","TXN_AMOUNT","FROM_BANK","TO_BANK")
然而,当运行上面的命令时,我得到了以下错误:
Error:(35, 26) Unable to find encoder for type stored in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.
val df3 = test_df.map(row => {
我尝试通过创建静态类型数据集来修改上述内容,但仍然存在相同的问题:
import spark.implicits._
case class Record(CUST_NAME: String, DIRECTION: String, BANK_NAME: String, TXN_AMT: Double)
val test_df4 = test_df.as[Record].map(row => {
val direction = row.DIRECTION
if (direction == "D")
(
row.CUST_NAME,
row.DIRECTION,
row.BANK_NAME,
row.TXN_AMT,
row.BANK_NAME, // This will become the FROM_BANK column
null // This will become to the TO_BANK column
)
else if (direction == "C")
(
row.CUST_NAME,
row.DIRECTION,
row.BANK_NAME,
row.TXN_AMT,
null, // This will become the FROM_BANK column
row.BANK_NAME // This will become to the TO_BANK column
)
}).toDF("CUST_NAME","DIRECTION","BANK_NAME","TXN_AMOUNT","FROM_BANK","TO_BANK")
test_df4.show(100,false)
我知道第一个选项可以工作,但我希望以更程序化的方式做到这一点,因为我需要为所有基于DIRECTION
列值的多列做到这一点。如果有任何反馈或建议,我将不胜感激。
谢谢!
您可以将when
语句放在列表中(或以编程方式将它们添加到列表中),然后选择它们。然后,您不需要链接一堆withColumn
语句。还请注意,.otherwise(null)
不是必需的,因为这是默认行为。
val newcols = List(
col("*"),
when($"DIRECTION" === "D", $"BANK_NAME").as("FROM_BANK"),
when($"DIRECTION" === "C", $"BANK_NAME").as("TO_BANK")
)
val df2 = df.select(newcols: _*)