Spark 函数平均值和 BigDecimal 的规模问题



我有这个案例类:

case class AllData(positionId: Long, warehouse: String, product: String, amount: BigDecimal, amountTime: Long)

和数据集:

val data: Dataset[AllData]

这个代码:

val statisticForAmounts = data.groupByKey(record => record.warehouse + ", " + record.product)
.agg(
max($"amount").as("maxAmount").as[BigDecimal],
avg($"amount").as("avgAmount").as[BigDecimal]
)

生成以下模式:

root
|-- value: string (nullable = true)
|-- maxAmount: decimal(38,18) (nullable = true)
|-- avgAmount: decimal(38,22) (nullable = true)

avgAmount的比例更大,导致问题,产生错误

org.apache.spark.sql.AnalysisException: Cannot up cast `avgAmount` from decimal(38,22) to decimal(38,18) as it may truncate
The type path of the target object is:
- root class: "scala.math.BigDecimal"

我也试着用这种方式对数据进行舍入:

val statisticForAmounts = data.groupByKey(record => record.warehouse + ", " + record.product)
.agg(
round(max($"amount"), 4).as("maxAmount").as[BigDecimal],
round(avg($"amount"), 4).as("avgAmount").as[BigDecimal]
)

这将架构更改为:

root
|-- value: string (nullable = true)
|-- maxAmount: decimal(38,4) (nullable = true)
|-- avgAmount: decimal(38,4) (nullable = true)

这次的错误是:

org.apache.spark.sql.AnalysisException: Cannot up cast `maxAmount` from decimal(38,4) to decimal(38,18) as it may truncate
The type path of the target object is:
- root class: "scala.math.BigDecimal"

为什么会这样?我该如何防止这种情况发生?我有Spark 2.4

情况似乎是avg函数在小数点上又添加了4个槽,以解决分数除法时对小数位数的增加需求。DecimalType(38,18)表示总共有38个插槽,小数点后有18个插槽(小数点前的区域剩下38-18=20)。

一种更简单的方法是,左边有20个插槽,右边有18个插槽,所以DecimalType(38,18) = (20 left, 18 right)。与DecimalType(38,22) = (16 left, 22 right)类似。当然,将(16,22)推入(20,18)是不可能的,因为将丢失4个左侧插槽(即十进制插槽)(尽管您将有另外4个右侧插槽)。

现在,由于AVG函数似乎在左边(即小数点后)添加了4个插槽,这基本上是(-4 right, +4 left)的操作。一旦这个奇怪的过程完成,我们就被迫将(16 left, 22 right)转换回标准的DecimalType(38,18),即(20 left, 18 right),因为这是Scala/Java的默认值。。。这就是失败的原因,因为我们需要砍掉(即"截断")右侧的4个插槽才能做到这一点。

所以。。。我们需要确保得到的数字能够真正适合这些插槽。因此,如果我们从一开始就从(20 left, 18 right)向下转换到(20 left, 14 right)(即DecimalType(34, 14)),那么(+0 right, +4 left)将使我们回到我们正在寻找的(20 left, 18 right)。换句话说,这样做:

val averageableData = 
data.withColumn("amount", $"amount".cast(DecimalType(34,14)))

然后,这样做应该有效:

averageableData.groupByKey(record => record.warehouse + ", " + record.product)
.agg(
max($"amount").as("maxAmount").as[BigDecimal],
avg($"amount").as("avgAmount").as[BigDecimal]
)

有趣的是,您实际上可以向小于(20 left, 14 right)的任何对象进行强制转换。也就是说,你可以做.cast(DecimalType(33, 13))或任何更少的事情。当(+0 left, +4 right)操作发生时,它仍然适合(20 left, 18 right)

最新更新