对于数据集,如何处理新对象实例化?



我必须遵循以下场景

 case class A(name:String)
 class Eq { def isMe(s:String) = s == "ME" }
 val a = List(A("ME")).toDS
 a.filter(l => new Eq().isMe(l.name))

这是否每次为每个执行器上的每个数据点创建一个新对象Eq

不错的一个!我不知道类型化数据集有不同的过滤方法。
为了回答你的问题,我将深入研究Spark的内部结构。

类型化的 Dtaset 上的filter具有以下签名:

def filter(func: T => Boolean): Dataset[T]

请注意,func是用T参数化的,因此Spark需要反序列化你的对象A以及函数。

TypedFilter Main$$$Lambda$, class A, [StructField(name,StringType,true)], newInstance(class A)

其中Main$$$Lambda$是随机生成的函数名称

在优化阶段,如果满足以下条件,则 EliminateSerialization 规则可能会消除它:

ds.map(...).filter(...)可以通过此规则进行优化以节省额外的反序列化,但ds.map(...).as[AnotherType].filter(...)无法优化。

如果规则适用TypedFilter则替换为 Filter

这里的问题是过滤器的condition。事实上,它是另一个名为 Invoke 的特殊表达式,其中:
targetObject是过滤器函数Main$$$Lambda$
functionNameapply,因为它是一个常规的 Scala 函数。

Spark最终以这两种模式之一运行 - generate codeinterpreter。让我们专注于第一个,因为它是默认值。

下面是将生成代码的方法调用的简化堆栈跟踪

SparkPlan.execute 
  //https://github.com/apache/spark/blob/03e30063127fd71bef8a14553381e805fe5b6679/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala#L596
  -> WholeStageCodegenExec.execute 
   [child: Filter]
    -> child.execute
       [condition Invoke]
     -> Invoke.genCode 
          //https://github.com/apache/spark/blob/branch-2.4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala#L345  
          -> doGenCode 

生成阶段后的简化代码:

final class GeneratedIteratorForCodegenStage1 extends BufferedRowIterator {
  private Object[] references;
  private scala.collection.Iterator input;
  private UnsafeRowWriter writer = new UnsafeRowWriter();
  public GeneratedIteratorForCodegenStage1(Object[] references) {
    this.references = references;
  }
  public void init(Iterator inputs) {
    this.inputs = inputs;
  }
  protected void processNext() throws IOException {
    while (input.hasNext() && !stopEarly()) {
      InternalRow row = input.next();
      do {
        //Create A object
        UTF8String value = row.getUTF8String(0));
        A a = new A(value.toString)
        //Filter by A's value
        result = (scala.Function1) references[0].apply(a);
        if (!result) continue;
        writer.write(0, value)
        append((writer.getRow());
      }
      if (shouldStop()) return;
    }
  }
}

我们可以看到投影是用引用变量中传递的对象数组构造的。但是引用变量在哪里以及实例化了多少次呢?
它是在 WholeStageCodegenExec 期间创建的,每个分区仅实例化一次。

这让我们得到的答案是,filter每个分区而不是每个数据点只会创建一次函数,EqA类将按数据点创建。

如果您对它被添加到代码上下文的位置感到好奇:
它发生在这里 javaType在哪里scala.function1. value是实施 - Main$$$Lambda$

最新更新