我有Airflow正在Kubernetes中使用CeleryExecutor运行。Airflow使用DatabricksOperator提交并监控Spark作业。
我的流式Spark作业有很长的运行时间(除非失败或被取消,否则它们将永远运行(。当Airflow工作人员的吊舱在流作业运行时被杀死时,会发生以下情况:
- 关联的任务变成僵尸(运行状态,但没有带心跳的进程(
- 当气流捕获僵尸时,任务被标记为失败
- Spark流作业继续运行
如何在Spark作业关闭之前强制它终止
我试过用TERM信号杀死Celery工作人员,但显然这会导致Celery停止接受新任务,等待当前任务完成(文档(。
您需要更加清楚这个问题。如果你说spark集群按预期完成了作业,而没有调用on_kill函数,那就是预期行为。根据文档,kill功能是在任务被杀死后进行清理。
def on_kill(self) -> None:
"""
Override this method to cleanup subprocesses when a task instance
gets killed. Any use of the threading, subprocess or multiprocessing
module within an operator needs to be cleaned up or it will leave
ghost processes behind.
"""
在你的情况下,当你手动终止工作时,它正在做它必须做的事情。
现在,如果您希望在成功完成作业后也有clean_up,请重写post_execute函数。根据文件。后执行是
def post_execute(self, context: Any, result: Any = None):
"""
This hook is triggered right after self.execute() is called.
It is passed the execution context and any results returned by the
operator.
"""