我必须遵循以下场景
case class A(name:String)
class Eq { def isMe(s:String) = s == "ME" }
val a = List(A("ME")).toDS
a.filter(l => new Eq().isMe(l.name))
这是否每次为每个执行器上的每个数据点创建一个新对象Eq
?
不错的一个!我不知道类型化数据集有不同的过滤方法。
为了回答你的问题,我将深入研究Spark的内部结构。
类型化的 Dtaset 上的filter
具有以下签名:
def filter(func: T => Boolean): Dataset[T]
请注意,func
是用T
参数化的,因此Spark需要反序列化你的对象A
以及函数。
TypedFilter Main$$$Lambda$, class A, [StructField(name,StringType,true)], newInstance(class A)
其中Main$$$Lambda$
是随机生成的函数名称
在优化阶段,如果满足以下条件,则 EliminateSerialization 规则可能会消除它:
ds.map(...).filter(...)
可以通过此规则进行优化以节省额外的反序列化,但ds.map(...).as[AnotherType].filter(...)
无法优化。
如果规则适用TypedFilter
则替换为 Filter
。
这里的问题是过滤器的condition
。事实上,它是另一个名为 Invoke 的特殊表达式,其中:targetObject
是过滤器函数Main$$$Lambda$
functionName
是apply
,因为它是一个常规的 Scala 函数。
Spark最终以这两种模式之一运行 - generate code
或interpreter
。让我们专注于第一个,因为它是默认值。
下面是将生成代码的方法调用的简化堆栈跟踪
SparkPlan.execute
//https://github.com/apache/spark/blob/03e30063127fd71bef8a14553381e805fe5b6679/sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala#L596
-> WholeStageCodegenExec.execute
[child: Filter]
-> child.execute
[condition Invoke]
-> Invoke.genCode
//https://github.com/apache/spark/blob/branch-2.4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/objects/objects.scala#L345
-> doGenCode
生成阶段后的简化代码:
final class GeneratedIteratorForCodegenStage1 extends BufferedRowIterator {
private Object[] references;
private scala.collection.Iterator input;
private UnsafeRowWriter writer = new UnsafeRowWriter();
public GeneratedIteratorForCodegenStage1(Object[] references) {
this.references = references;
}
public void init(Iterator inputs) {
this.inputs = inputs;
}
protected void processNext() throws IOException {
while (input.hasNext() && !stopEarly()) {
InternalRow row = input.next();
do {
//Create A object
UTF8String value = row.getUTF8String(0));
A a = new A(value.toString)
//Filter by A's value
result = (scala.Function1) references[0].apply(a);
if (!result) continue;
writer.write(0, value)
append((writer.getRow());
}
if (shouldStop()) return;
}
}
}
我们可以看到投影是用引用变量中传递的对象数组构造的。但是引用变量在哪里以及实例化了多少次呢?
它是在 WholeStageCodegenExec 期间创建的,每个分区仅实例化一次。
这让我们得到的答案是,filter
每个分区而不是每个数据点只会创建一次函数,Eq
和A
类将按数据点创建。
如果您对它被添加到代码上下文的位置感到好奇:
它发生在这里 javaType
在哪里scala.function1
. value
是实施 - Main$$$Lambda$