我设计了以下函数来处理任何数值类型的数组:
def array_sum[T](item:Traversable[T])(implicit n:Numeric[T]) = item.sum
// Registers a function as a UDF so it can be used in SQL statements.
sqlContext.udf.register("array_sumD", array_sum(_:Seq[Float]))
但是想要传递一个类型的数组会让我浮出以下错误:
// Now we can use our function directly in SparkSQL.
sqlContext.sql("SELECT array_sumD(array(5.0,1.0,2.0)) as array_sum").show
错误:
cannot resolve 'UDF(array(5.0,1.0,2.0))' due to data type mismatch: argument 1 requires array<double> type, however, 'array(5.0,1.0,2.0)' is of array<decimal(2,1)> type;
Spark-SQL 中十进制值的默认数据类型是十进制。如果将查询中的文本转换为浮点数,并使用相同的 UDF,则它有效:
sqlContext.sql(
"""SELECT array_sumD(array(
| CAST(5.0 AS FLOAT),
| CAST(1.0 AS FLOAT),
| CAST(2.0 AS FLOAT)
|)) as array_sum""".stripMargin).show
结果,正如预期的那样:
+---------+
|array_sum|
+---------+
| 8.0|
+---------+
或者,如果您确实想使用小数(以避免浮点问题),您仍然必须使用强制转换来获得正确的精度,而且您将无法使用 Scala 的漂亮Numeric
和sum
,因为小数被读取为java.math.BigDecimal
。所以 - 你的代码将是:
def array_sum(item:Traversable[java.math.BigDecimal]) = item.reduce((a, b) => a.add(b))
// Registers a function as a UDF so it can be used in SQL statements.
sqlContext.udf.register("array_sumD", array_sum(_:Seq[java.math.BigDecimal]))
sqlContext.sql(
"""SELECT array_sumD(array(
| CAST(5.0 AS DECIMAL(38,18)),
| CAST(1.0 AS DECIMAL(38,18)),
| CAST(2.0 AS DECIMAL(38,18))
|)) as array_sum""".stripMargin).show