构成Spark UDF时是否会受到惩罚(而不是将UDF作为一个)



我想知道是否通过构成Spark UDF会受到性能惩罚。通常,我更喜欢构成一件事情的小功能……

这是一个微不足道的例子,给定一个DataFrame DF:

def inc = udf( (i: Double) => i + 1)
def double = udf( (i: Double) => i * 2)
df.withColumn("r", double(inc($"c")))

vers

def incAndDouble = udf( (i: Double) => (i + 1) * 2)
df.withColumn("r", incAndDouble($"c")

从我所看到的,在这个简单的示例中性能是相同的。

你能解释为什么吗?火花如何在现场工作?

它总是正确的吗?

[update]

我可能会在巧妙的组合(不仅是简单功能组成)时进行反例,如以下示例

def filter = udf((s: Seq[String]) => s.startsWith("A"))
def size = udf((s: Seq[String]) => s.size)
val filterAndSize = udf((s: Seq[String]) => s.count(_.startsWith("A")))

所以,我想滤镜和尺寸是可取的,因为它可以避免某些中介集合实例化。

tl; dr 可能会有某些性能退化或罚款,但可以忽略不计。

你能解释为什么吗?

用"解释"看到您的问题非常有趣,这正是用于查看Spark SQL封面上发生什么的方法的名称及其如何执行查询:)

因此,使用Dataset.explain甚至更多的详细版本Dataset.explain(extended = true)查看所有优化(以及可能的性能降解)。

def inc = udf( (i: Double) => i + 1)
def double = udf( (i: Double) => i * 2)
val df = Seq(1,2,3).toDF("c")
val q = df.withColumn("r", double(inc($"c")))

用两个UDF组成了计划,如下所示。

scala> q.explain(extended = true)
== Parsed Logical Plan ==
'Project [c#3, UDF(UDF('c)) AS r#10]
+- AnalysisBarrier Project [value#1 AS c#3]
== Analyzed Logical Plan ==
c: int, r: double
Project [c#3, if (isnull(if (isnull(cast(c#3 as double))) null else UDF(cast(c#3 as double)))) null else UDF(if (isnull(cast(c#3 as double))) null else UDF(cast(c#3 as double))) AS r#10]
+- Project [value#1 AS c#3]
   +- LocalRelation [value#1]
== Optimized Logical Plan ==
LocalRelation [c#3, r#10]
== Physical Plan ==
LocalTableScan [c#3, r#10]

让我们看看一个UDF的计划,这是两个UDF的组合。

def incAndDouble = udf( (i: Double) => (i + 1) * 2)
val q = df.withColumn("r", incAndDouble($"c"))
scala> q.explain(extended = true)
== Parsed Logical Plan ==
'Project [c#3, UDF('c) AS r#16]
+- AnalysisBarrier Project [value#1 AS c#3]
== Analyzed Logical Plan ==
c: int, r: double
Project [c#3, if (isnull(cast(c#3 as double))) null else UDF(cast(c#3 as double)) AS r#16]
+- Project [value#1 AS c#3]
   +- LocalRelation [value#1]
== Optimized Logical Plan ==
LocalRelation [c#3, r#16]
== Physical Plan ==
LocalTableScan [c#3, r#16]

在这种特殊情况下,差异不是,因为在查询中的物理计划是相同的,即LocalTableScan

它可能与其他数据源(例如文件或JDBC)有所不同,但是我个人的建议是开发UDF,尽可能小,因为它们是Spark Optimizer的黑盒子。


它总是正确的吗?

否,一点都不重要,因为它在很大程度上取决于您在UDF中所做的工作(但这与是否首先写UDF有关)。

对于以下UDF为谓词(即返回布尔值):

def filter = udf((s: Seq[String]) => s.startsWith("A"))

Spark可以优化UDF的使用(如果是不是 a udf,而是简单的filter操作),然后将其推到数据源以加载更少的数据。这可能会对性能产生巨大影响。

相关内容

  • 没有找到相关文章

最新更新