考虑一个 AWS Glue 任务片段:
val input = glueContext
.getCatalogSource(database = "my_db", tableName = "my_table")
.getDynamicFrame()
val myLimit = 10
if (input.count() <= myLimit) {
// end glue job here with error
}
// continue execution
如何退出具有错误状态的作业?如果我只是跳过执行,它只会以成功结束;如果我抛出异常,它会失败并出现异常。我可以只调用一些东西来停止具有失败/错误状态的作业而不引发异常吗?
更新
乍一看,我可以:
val spark: SparkContext = SparkContext.getOrCreate()
val glueContext: GlueContext = new GlueContext(spark)
val jobId = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_ID").toArray)("JOB_ID")
spark.cancelJob(jobId)
但:
-
SparkContext
来自内部框架,结束作业可能会导致不可预测(不稳定(的结果。 -
org.apache.spark.SparkContext#cancelJob
接收Int
,而 AWS Glue 有一个这样的String
JOB_ID
:j_aaa11111a1a11a111a1aaa11a11111aaa11a111a1111111a111a1a1aa111111a
。因此,它不能直接传递给cancelJob
。
这是写成pyspark的,因为这是我所知道的
args = getResolvedOptions(
sys.argv, ["TempDir", "JOB_NAME"]
)
job = Job(glue_context)
job.init(args["JOB_NAME"], args)
if my_check() == False:
# you can use any other exit code and glue will still report failure
# because the job is not committed
sys.exit(0)
do_normal_stuff()
job.commit()
火花作业和粘合作业是不同的东西,这就是为什么你不能互换它们的 ID 的原因。