有没有办法在python上正确捕获异常'org.apache.spark.sql.delta.ConcurrentAppendException'?



有多个数据管道可能导致并发追加异常试图更新相同的增量表,我无法控制这些管道的并发性。我想捕获异常并重试增量表的更新,但似乎即使我添加尝试并捕获异常也没有被处理。欢迎提出任何建议。

try:
# Update delta, retry in case of concurrency 
curatedTable.alias("staged").merge(updateDF.alias("curated"),
"staged.ExperienceId = curated.ExperienceId AND staged.ExperienceVersion = curated.ExperienceVersion") 
.whenMatchedUpdate(set = { "staged.updated" : "True" } ).execute()
break
except exception as e:
if retry < 2:
continue
else:
raise Exception("RETRY FAILED")  

不是得到'RETRY FAILED'消息的异常,而是得到以下

Operation on target Notebook Synapse failed: Py4JJavaError: An error occurred while calling o42198.save.
: org.apache.spark.sql.delta.ConcurrentAppendException: Files were added to the root of the table by a concurrent update. Please try the operation again.
Conflicting commit: ......

您需要从delta导出ConcurrentAppendException。异常,然后捕获此异常。很适合我。

相关内容

最新更新