我有这个案例类:
case class AllData(positionId: Long, warehouse: String, product: String, amount: BigDecimal, amountTime: Long)
和数据集:
val data: Dataset[AllData]
并且此代码运行良好:
val statisticForAmounts = data.groupByKey(record => record.warehouse + ", " + record.product)
.agg(
max($"amount").as("maxAmount").as[DecimalType])
statisticForAmounts.show(5)
但是当我这样做时:
statisticForAmounts.collect()
我得到这个奇怪的错误:
org.apache.spark.sql.AnalysisException: Can't extract value from maxAmount#101: need struct type but got decimal(38,18)
这是我的模式:
root
|-- value: string (nullable = true)
|-- maxAmount: decimal(38,18) (nullable = true)
问题的原因是什么以及如何解决?
这可能是因为Spark和Scala之间BigDecimal
不兼容。您可能希望将其更改为Double
并尝试一下。
case class AllData(positionId: Long, warehouse: String, product: String, amount: Double, amountTime: Long)
val data1 = AllData(1,"WC","FC",12.11,123)
val data = spark.createDataset(Seq(data1))
val statisticForAmounts = data.groupByKey(record => record.warehouse + ", " + record.product).agg(max($"amount").as("maxAmount").as[Double])
scala> statisticForAmounts.show
+------+---------+
| value|maxAmount|
+------+---------+
|WC, FC| 12.11|
+------+---------+
scala> statisticForAmounts.collect
res36: Array[(String, Double)] = Array((WC, FC,12.11))
将Spark从2.1版本升级到2.4版本后,我能够替换代码:
val statisticForAmounts = data.groupByKey(record => record.warehouse + ", " + record.product)
.agg(
max($"amount").as("maxAmount").as[DecimalType])
跟:
val statisticForAmounts = data.groupByKey(record => record.warehouse + ", " + record.product)
.agg(
max($"amount").as("maxAmount").as[BigDecimal])
一切都运行顺利。在升级之前,没有适用于 BigDecimal 的编码器