我有一个数据帧,其中一列的值为空。
我正在将列传递给 udf 以进行数学乘法。我想跳过空值的 udf。我不想使用 na.fill 来替换空。
我的DF的架构看起来像 数据帧1
root
|-- Name: string (nullable =true)
|-- Value: decimal(38,0) (nullable=true) //This is the col
|-- powValue: integer (nullable=true)
|-- mulValue: integer (nullable=true)
def udfFn(val1: Integer, powVal:Integer, mulVal:Integer) = {
val bd1 = new BigDecimal(val1);
val bd2 =bd1.scakeByPowerTen(-powVal)
val bd3 = new BigDecimal(mulVal)
val bd4=bd2.multiply(bd3)
}
val calUDF=udf({(val1: Integer, powVal:Integer, mulVal:Integer)=>
udfFn(val1,powVal,mulVal)})
val newDf=DataFrame1.withColumn("Final_Value",calUDF(col("Value"),col("powValue"),col("mulValue")))
我的数据帧 1可以在值列中包含 0,空,数字
看起来val1
和Value
实际上应该根据decimal(38,0)
的类型进行BigDecimal
,所以我将在下面的代码中做出这个假设。
做到这一点的快速方法就是使用一个好的'oleif-else
语句。这也可能是性能最高的方式,具体取决于:
def udfFn(val1: BigDecimal, powVal: Int, mulVal: Int): BigDecimal =
if (val1 != null && powVal != null && mulVal != null) {
val mul = new BigDecimal(mulVal)
val1.scaleByPowerTen(-powVal).multiply(mul)
} else {
null
}
我觉得这自己看起来有点丑,如果你想让它读起来更好,这是函数式编程的工作!Option
和for
理解救援!(请注意,如果性能有问题,第一个解决方案可能是您的最佳选择)
您可以执行以下操作:
def udfFn(val1: BigDecimal, powVal: Int, mulVal: Int): Option[BigDecimal] =
val r = for {
bd1 <- Option(val1)
pow <- Option(powVal)
mul <- Option(mulVal).map(new BigDecimal(_))
} yield (bd1.scaleByPowerTen(-pow).multiply(mul))
for
理解将产生一个Some
Option
,只有当每个输入Option
都Some
时,该才会,否则它将None
。
我个人更喜欢用Dataset
而不是DataFrame
来做到这一点,因为我认为它使转换更容易理解,并使架构在每一步都非常明确,并允许您在不依赖 UDF 的情况下编写转换,但绝对最好做您和/或您的组织更舒服的事情。对于Dataset
解决方案,我会做几个案例类:
case class NewData(name: Option[String], val1: Option[BigDecimal], powVal: Option[Int], mulVal: Option[Int], finalValue: Option[BigDecimal])
case class SomeData(name: Option[String], val1: Option[BigDecimal], powVal: Option[Int], mulVal: Option[Int]) {
def toNewData: NewData = {
val fv = for {
bd1 <- val1
pow <- powVal
mul <- mulVal.map(new BigDecimal(_))
} yield (bd1.scaleByPowerTen(-pow).multiply(mul))
NewData(name, val1, powVal, mulVal, fv)
}
}
然后,执行转换的代码将是这样的:
import spark.implicits._
val ds: Dataset[SomeData] = ... // Obtained however you wish
val finalDs: Dataset[NewData] = ds.map(_.toNewData)