(Scala特定问题。(
虽然Spark文档鼓励在可能的情况下使用DataFrame API,但如果DataFrame API不足,则通常在返回RDD API或使用UDF之间进行选择。这两种备选方案之间是否存在固有的性能差异?
RDD和UDF的相似之处在于,它们都无法从Catalyst和Tungsten优化中获益。还有其他开销吗?如果有,这两种方法之间有区别吗?
举一个具体的例子,假设我有一个DataFrame,它包含一列具有自定义格式的文本数据(不适用于regexp匹配(。我需要解析该列,并添加一个新的向量列,其中包含生成的令牌。
它们都不能从催化剂和钨优化中获益
这并不完全正确。虽然UDF没有从Tungsten优化中受益(可以说简单的SQL转换也没有得到巨大的提升(,但您仍然可以从Catalyst提供的执行计划优化中受益。让我们用一个简单的例子来说明这一点(注意:Spark 2.0和Scala。不要将其外推到早期版本,尤其是PySpark(:
val f = udf((x: String) => x == "a")
val g = udf((x: Int) => x + 1)
val df = Seq(("a", 1), ("b", 2)).toDF
df
.groupBy($"_1")
.agg(sum($"_2").as("_2"))
.where(f($"_1"))
.withColumn("_2", g($"_2"))
.select($"_1")
.explain
// == Physical Plan ==
// *HashAggregate(keys=[_1#2], functions=[])
// +- Exchange hashpartitioning(_1#2, 200)
// +- *HashAggregate(keys=[_1#2], functions=[])
// +- *Project [_1#2]
// +- *Filter UDF(_1#2)
// +- LocalTableScan [_1#2, _2#3]
执行计划向我们展示了几件事:
- 在聚合之前,已按下选择
- 投影在聚合之前已经被按下,并有效地删除了第二个UDF调用
根据数据和管道的不同,这几乎可以免费提供实质性的性能提升。
也就是说,RDD和UDF都需要在安全和不安全之间进行迁移,而后者的灵活性要低得多。尽管如此,如果您只需要一个简单的类似map
的行为,而不初始化昂贵的对象(如数据库连接(,那么UDF就是您的选择。
在稍微复杂一点的场景中,您可以很容易地降到通用Dataset
,并为您真正需要访问一些低级别功能(如自定义分区(的情况保留RDDs
。
(注意:我没有为此测量备份(
对我来说,shuffle和(去(序列化是主要的成本。但在这些之后,拥有干净的代码是最重要的。考虑到这一点:
使用RDD操作的主要缺点是需要将/序列化为完整的jvm对象。而使用udf可能只(反(序列化所需的列。请注意,对于其他我不知道的数据格式,这是在处理面向列的数据(如镶木地板(时发生的,但在许多情况下,两者都具有相似的性能。
因此,如果你的算法主要是过滤和混洗操作,和/或可以简单地用数据帧操作和本地udf来表示,你应该使用它们。然而,如果您的算法需要对多个列进行复杂的处理,那么最好提前支付反序列化费用,并在jvm对象上执行干净高效的scala代码。
因此,根据我实现复杂数学算法的个人经验,我通常将代码分为两个步骤:
- 纯dataframe操作可以完成过滤、联接和groupBy操作。在极少数情况下,当需要无法使用dataframe方法表达的特定本地操作时(如果它只需要很少的列(,我可以使用udf
- 然后转换为rdd,并对数学和/或复杂查找部分使用(平面(map-op