我正在编写一个与Spark的JDBC数据源实现有相似之处的数据源,我想问Spark如何处理某些故障场景。据我所知,如果执行器在运行任务时死亡,Spark将恢复执行器并尝试重新运行该任务。然而,在数据完整性和Spark的JDBC数据源API(例如df.write.format("jdbc").option(...).save()
)的背景下,这是如何实现的?
在JdbcUtils.scala的savePartition函数中,我们可以看到Spark调用从用户提供的数据库url/credentials生成的Java连接对象的提交和回滚函数(见下文)。但是,如果执行器在commit()完成后或调用rollback()之前死亡,Spark是否会尝试重新运行任务并再次写入相同的数据分区,从而在数据库中创建重复的提交行?如果执行器在调用commit()或rollback()的过程中死亡,会发生什么?
try {
...
if (supportsTransactions) {
conn.commit()
}
committed = true
Iterator.empty
} catch {
case e: SQLException =>
...
throw e
} finally {
if (!committed) {
// The stage must fail. We got here through an exception path, so
// let the exception through unless rollback() or close() want to
// tell the user about another problem.
if (supportsTransactions) {
conn.rollback()
}
conn.close()
} else {
...
}
}
但是,如果执行器在commit()完成后或调用rollback()之前立即死亡,Spark是否会尝试重新运行任务并再次写入相同的数据分区,从而在数据库中创建重复的提交行?
由于Spark SQL(它是一个基于RDD API的高级API)并不真正了解JDBC或任何其他协议的所有特性,您会有什么期望?更不用说底层的执行运行时,即Spark Core。
当您编写像df.write.format(“jdbc”).option(...).save()
Spark SQL这样的结构化查询时,可以使用像RDD API这样的低级汇编将其转换为分布式计算。由于Spark SQL试图接受尽可能多的"协议"(包括JDBC),它的DataSource API将大部分错误处理留给了数据源本身。
Spark调度任务的核心(不知道甚至不在乎任务做什么)只是监视执行,如果任务失败,它会尝试再次执行(默认情况下,直到3次尝试失败)。
因此,当您编写自定义数据源时,您知道演练,并且必须在代码中处理此类重试。
处理错误的一种方法是使用TaskContext(例如addTaskCompletionListener
或addTaskFailureListener
)注册任务侦听器。
由于上述原因,我不得不引入一些重复数据消除逻辑。你可能会两次(或更多次)做出同样的承诺。