RDD API对与DataFrame API混合的UDF的性能影响



(Scala特定问题。(

虽然Spark文档鼓励在可能的情况下使用DataFrame API,但如果DataFrame API不足,则通常在返回RDD API或使用UDF之间进行选择。这两种备选方案之间是否存在固有的性能差异?

RDD和UDF的相似之处在于,它们都无法从Catalyst和Tungsten优化中获益。还有其他开销吗?如果有,这两种方法之间有区别吗?

举一个具体的例子,假设我有一个DataFrame,它包含一列具有自定义格式的文本数据(不适用于regexp匹配(。我需要解析该列,并添加一个新的向量列,其中包含生成的令牌。

它们都不能从催化剂和钨优化中获益

这并不完全正确。虽然UDF没有从Tungsten优化中受益(可以说简单的SQL转换也没有得到巨大的提升(,但您仍然可以从Catalyst提供的执行计划优化中受益。让我们用一个简单的例子来说明这一点(注意:Spark 2.0和Scala。不要将其外推到早期版本,尤其是PySpark(:

val f = udf((x: String) => x == "a")
val g = udf((x: Int) => x + 1)
val df = Seq(("a", 1), ("b", 2)).toDF
df
  .groupBy($"_1")
  .agg(sum($"_2").as("_2"))
  .where(f($"_1"))
  .withColumn("_2", g($"_2"))
  .select($"_1")
  .explain
// == Physical Plan ==
// *HashAggregate(keys=[_1#2], functions=[])
// +- Exchange hashpartitioning(_1#2, 200)
//    +- *HashAggregate(keys=[_1#2], functions=[])
//       +- *Project [_1#2]
//          +- *Filter UDF(_1#2)
//             +- LocalTableScan [_1#2, _2#3]

执行计划向我们展示了几件事:

  • 在聚合之前,已按下选择
  • 投影在聚合之前已经被按下,并有效地删除了第二个UDF调用

根据数据和管道的不同,这几乎可以免费提供实质性的性能提升。

也就是说,RDD和UDF都需要在安全和不安全之间进行迁移,而后者的灵活性要低得多。尽管如此,如果您只需要一个简单的类似map的行为,而不初始化昂贵的对象(如数据库连接(,那么UDF就是您的选择。

在稍微复杂一点的场景中,您可以很容易地降到通用Dataset,并为您真正需要访问一些低级别功能(如自定义分区(的情况保留RDDs

(注意:我没有为此测量备份(

对我来说,shuffle和(去(序列化是主要的成本。但在这些之后,拥有干净的代码是最重要的。考虑到这一点:

使用RDD操作的主要缺点是需要将/序列化为完整的jvm对象。而使用udf可能只(反(序列化所需的列。请注意,对于其他我不知道的数据格式,这是在处理面向列的数据(如镶木地板(时发生的,但在许多情况下,两者都具有相似的性能。

因此,如果你的算法主要是过滤和混洗操作,和/或可以简单地用数据帧操作和本地udf来表示,你应该使用它们。然而,如果您的算法需要对多个列进行复杂的处理,那么最好提前支付反序列化费用,并在jvm对象上执行干净高效的scala代码。

因此,根据我实现复杂数学算法的个人经验,我通常将代码分为两个步骤:

  1. 纯dataframe操作可以完成过滤、联接和groupBy操作。在极少数情况下,当需要无法使用dataframe方法表达的特定本地操作时(如果它只需要很少的列(,我可以使用udf
  2. 然后转换为rdd,并对数学和/或复杂查找部分使用(平面(map-op

相关内容

  • 没有找到相关文章

最新更新