Spark 错误"org.apache.spark.sql.AnalysisException: 无法提取值需要结构类型,但得到十进制 (38,18)"



我有这个案例类:

case class AllData(positionId: Long, warehouse: String, product: String, amount: BigDecimal, amountTime: Long)

和数据集:

val data: Dataset[AllData]

并且此代码运行良好:

val statisticForAmounts = data.groupByKey(record => record.warehouse + ", " + record.product)
.agg(
max($"amount").as("maxAmount").as[DecimalType])
statisticForAmounts.show(5)

但是当我这样做时:

statisticForAmounts.collect()

我得到这个奇怪的错误:

org.apache.spark.sql.AnalysisException: Can't extract value from maxAmount#101: need struct type but got decimal(38,18)

这是我的模式:

root
|-- value: string (nullable = true)
|-- maxAmount: decimal(38,18) (nullable = true)

问题的原因是什么以及如何解决?

这可能是因为Spark和Scala之间BigDecimal不兼容。您可能希望将其更改为Double并尝试一下。

case class AllData(positionId: Long, warehouse: String, product: String, amount: Double, amountTime: Long)
val data1 = AllData(1,"WC","FC",12.11,123)
val data = spark.createDataset(Seq(data1))
val statisticForAmounts = data.groupByKey(record => record.warehouse + ", " + record.product).agg(max($"amount").as("maxAmount").as[Double])
scala> statisticForAmounts.show
+------+---------+                                                              
| value|maxAmount|
+------+---------+
|WC, FC|    12.11|
+------+---------+

scala> statisticForAmounts.collect
res36: Array[(String, Double)] = Array((WC, FC,12.11))

将Spark从2.1版本升级到2.4版本后,我能够替换代码:

val statisticForAmounts = data.groupByKey(record => record.warehouse + ", " + record.product)
.agg(
max($"amount").as("maxAmount").as[DecimalType])

跟:

val statisticForAmounts = data.groupByKey(record => record.warehouse + ", " + record.product)
.agg(
max($"amount").as("maxAmount").as[BigDecimal])

一切都运行顺利。在升级之前,没有适用于 BigDecimal 的编码器