如何在 Spark 2.3.0 UDF 中为每个工作线程构造和持久化引用对象?



在 Spark 2.3.0 结构化流式处理作业中,我需要将一列追加到数据帧,该列派生自现有列的同一行的值。

我想在 UDF 中定义此转换,并使用 withColumn 来构建新的数据帧。

执行此转换需要咨询构造成本非常高的引用对象 - 每条记录构造一次会产生不可接受的性能。

为每个工作节点构造和保留此对象一次的最佳方法是什么,以便可以对每批中的每个记录重复引用它?请注意,该对象不可序列化。

我目前的尝试围绕着子类化 UserDefinedFunction 将昂贵的对象添加为惰性成员,并为这个子类提供一个替代构造函数,该子类执行通常由 udf 函数执行的 init,但到目前为止,我一直无法让它做udf做的那种类型强制——当我的转换 lambda 处理字符串时,一些深层类型推断需要org.apache.spark.sql.Column类型的对象输入和输出。

像这样:

import org.apache.spark.sql.expressions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.types.DataType
class ExpensiveReference{
def ExpensiveReference() = ... // Very slow
def transformString(in:String) = ... // Fast
}
class PersistentValUDF(f: AnyRef, dataType: DataType, inputTypes: Option[Seq[DataType]]) extends UserDefinedFunction(f: AnyRef, dataType: DataType, inputTypes: Option[Seq[DataType]]){  
lazy val ExpensiveReference = new ExpensiveReference()
def PersistentValUDF(){
this(((in:String) => ExpensiveReference.transformString(in) ):(String => String), StringType, Some(List(StringType)))
}
}

我越深入这个兔子洞,我就越怀疑有更好的方法来实现这一目标,而我却忽略了。因此,这篇文章。

编辑: 我测试了在 UDF 中声明的对象中延迟初始化引用;这将触发重新初始化。示例代码和对象

class IntBox {
var valu = 0;
def increment {
valu = valu + 1
}
def get:Int ={
return valu
}
}

val altUDF = udf((input:String) => {
object ExpensiveRef{
lazy val box = new IntBox
def transform(in:String):String={
box.increment
return in + box.get.toString
}
}
ExpensiveRef.transform(input)
})

上面的 UDF 总是附加 1;因此惰性对象正在按记录重新初始化。

我找到了这篇文章,我的选项 1 能够变成一个可行的解决方案。最终结果与Jacek Laskowski的答案相似,但进行了一些调整:

  1. 将对象定义拉出 UDF 的范围。即使很懒惰,如果在 UDF 范围内定义,它仍然会重新初始化。
  2. 将转换函数从对象移到 UDF 的 lambda 中(为避免序列化错误而必需(
  3. 在 UDF lambda 的闭包中捕获对象的惰性成员

像这样:

object ExpensiveReference {
lazy val ref = ...
}
val persistentUDF = udf((input:String)=>{
/*transform code that references ExpensiveReference.ref*/
})

免责声明让我试一试,但请将其视为正在进行的工作(反对票是很大的禁忌:)(

我要做的是使用带有lazy val的 Scala 对象作为昂贵的引用。

object ExpensiveReference {
lazy val ref = ???
def transform(in:String) = {
// use ref here
}
}

对于该对象,无论您在 Spark 执行器上做什么(无论是 UDF 的一部分还是任何其他计算的一部分(,都将在第一次访问时实例化ExpensiveReference.ref。您可以直接访问它或transform的一部分。

同样,在 UDF 或 UDAF 或任何其他转换中执行此操作并不重要。关键是,一旦在Spark执行器上发生计算,"构造一个非常昂贵的引用对象 - 每条记录构造一次会产生不可接受的性能。

它可能在UDF中(只是为了更清楚(。

最新更新