在使用 udf 执行操作时,在数据帧中将空值保留为空



我有一个数据帧,其中一列的值为空。

我正在将列传递给 udf 以进行数学乘法。我想跳过空值的 udf。我不想使用 na.fill 来替换空。

我的DF的架构看起来像 数据帧1

root
|-- Name: string (nullable =true)
|-- Value: decimal(38,0) (nullable=true) //This is the col
|-- powValue: integer (nullable=true)
|-- mulValue: integer (nullable=true)


def udfFn(val1: Integer, powVal:Integer, mulVal:Integer) = {
val bd1 = new BigDecimal(val1);
val bd2 =bd1.scakeByPowerTen(-powVal)
val bd3 = new BigDecimal(mulVal)
val bd4=bd2.multiply(bd3)
}
val calUDF=udf({(val1: Integer, powVal:Integer, mulVal:Integer)=> 
udfFn(val1,powVal,mulVal)})
val newDf=DataFrame1.withColumn("Final_Value",calUDF(col("Value"),col("powValue"),col("mulValue")))

我的数据帧 1可以在列中包含 0,空,数字

看起来val1Value实际上应该根据decimal(38,0)的类型进行BigDecimal,所以我将在下面的代码中做出这个假设。

做到这一点的快速方法就是使用一个好的'oleif-else语句。这也可能是性能最高的方式,具体取决于:

def udfFn(val1: BigDecimal, powVal: Int, mulVal: Int): BigDecimal =
if (val1 != null && powVal != null && mulVal != null) {
val mul = new BigDecimal(mulVal)
val1.scaleByPowerTen(-powVal).multiply(mul)
} else {
null
}

我觉得这自己看起来有点丑,如果你想让它读起来更好,这是函数式编程的工作!Optionfor理解救援!(请注意,如果性能有问题,第一个解决方案可能是您的最佳选择)

您可以执行以下操作:

def udfFn(val1: BigDecimal, powVal: Int, mulVal: Int): Option[BigDecimal] =
val r = for {
bd1 <- Option(val1)
pow <- Option(powVal)
mul <- Option(mulVal).map(new BigDecimal(_))
} yield (bd1.scaleByPowerTen(-pow).multiply(mul))

for理解将产生一个SomeOption,只有当每个输入OptionSome时,该才会,否则它将None

我个人更喜欢用Dataset而不是DataFrame来做到这一点,因为我认为它使转换更容易理解,并使架构在每一步都非常明确,并允许您在不依赖 UDF 的情况下编写转换,但绝对最好做您和/或您的组织更舒服的事情。对于Dataset解决方案,我会做几个案例类:

case class NewData(name: Option[String], val1: Option[BigDecimal], powVal: Option[Int], mulVal: Option[Int], finalValue: Option[BigDecimal])
case class SomeData(name: Option[String], val1: Option[BigDecimal], powVal: Option[Int], mulVal: Option[Int]) {
def toNewData: NewData = {
val fv = for {
bd1 <- val1
pow <- powVal
mul <- mulVal.map(new BigDecimal(_))
} yield (bd1.scaleByPowerTen(-pow).multiply(mul))
NewData(name, val1, powVal, mulVal, fv)
}
}

然后,执行转换的代码将是这样的:

import spark.implicits._
val ds: Dataset[SomeData] = ... // Obtained however you wish
val finalDs: Dataset[NewData] = ds.map(_.toNewData)

相关内容

  • 没有找到相关文章

最新更新