我正在用纱线作为资源管理器和2个节点在EMR中运行Spark作业。如果未满足我的条件,我需要故意使步骤失败,因此下一步无法按照配置执行。为了实现此目的
它运行良好,但发电机中的记录插入了两次。
以下是我的代码。
if(<condition>) {
<method call to insert in dynamo>
throw new SparkException(<msg>);
return;
}
如果我删除了抛出异常的行,则可以正常工作,但是步骤完成了。
我该如何使步骤失败,而不会两次获得日志消息。
感谢您的帮助。
问候,Sorabh
可能插入了两次发电机消息的原因是因为您的错误条件被两个不同的执行者击中并处理。Spark正在将工作划分为在工人之间进行的工作,而这些工人没有任何知识。
我不确定是什么推动了您的要求使Spark Step失败的要求,但是我建议您改为在您的应用程序代码中跟踪该故障案例,而不是试图直接尝试Spark死亡。换句话说,编写检测错误并将这些代码传递回您的火花驱动程序,然后在适当的情况下采取行动。
做到这一点的一种方法是使用累加器来计算处理数据时发生的任何错误。它看起来会大致像这样(我假设Scala和DataFrames,但是您可以根据需要适应RDD和/或Python):
val accum = sc.longAccumulator("Error Counter")
def doProcessing(a: String, b: String): String = {
if(condition) {
accum.add(1)
null
}
else {
doComputation(a, b)
}
}
val doProcessingUdf = udf(doProcessing _)
df = df.withColumn("result", doProcessing($"a", $"b"))
df.write.format(..).save(..) // Accumulator value not computed until an action occurs!
if(accum.value > 0) {
// An error detected during computation! Do whatever needs to be done.
<insert dynamo message here>
}
这种方法的一件好事是,如果您在Spark UI中寻找反馈,您将能够在运行时看到累加器值。作为参考,这是累加器的文档:http://spark.apache.org/docs/latest/rdd-programming-guide.html#accumulators