我正在使用Spark 2.2。我想在固定尺寸数组中标准化每个值。
输入
{"values": [1,2,3,4]}
输出
{"values": [0.25, 0.5, 0.75, 1] }
目前,我正在使用 udf :
val f = udf { (l: Seq[Double]) =>
val max = l.max
l.map(_ / max)
}
是否有一种方法可以避免使用UDF(以及相关的性能罚款)。
说每个数组中的记录数为 n
val n: Int
然后
import org.apache.spark.sql.functions._
df
.withColumn("max", greatest((0 until n).map(i => col("value")(i)): _*))
.withColumn("values", array((0 until n).map(i => col("value")(i) / col("max")): _*))
我想出了我的UDF的优化版本,该版本执行了现场更新。
val optimizedNormalizeUdf = udf { (l: mutable.WrappedArray[Double]) =>
val max = l.max
(0 until n).foreach(i => l.update(i, l(i) / max))
l
}
我已经写了一个基准测试,以检查用户8838736提出的解决方案的性能。这是结果。
[info] Benchmark Mode Cnt Score Error Units
[info] NormalizeBenchmark.builtin avgt 10 140,293 ± 10,805 ms/op
[info] NormalizeBenchmark.udf_naive avgt 10 104,708 ± 7,421 ms/op
[info] NormalizeBenchmark.udf_optimized avgt 10 99,492 ± 7,829 ms/op
结论:在这种情况下, udf 是性能最大的解决方案。
ps:对于那些有兴趣的人,基准的源代码在这里:https://github.com/yannmoisan/spark-jmh