Airflow—在长时间运行的任务中持久化任务状态/检查点信息



我有一个任务可以运行10多个小时,比如在for循环中。

我想存储检查点在每次循环执行后,如果任务中出现错误或worker崩溃,则重试的任务可以通过检索特定于该任务运行的检查点信息从它离开的位置恢复。

那么,问题是,如何以及在哪里存储这个检查点信息?

任务逻辑如下:-

long_running_task:
seqNo = getStoredCheckpointForTask() 
do
if(seqNo == null )        
seqNo = getFirstSequenceFromSomeSource() //1-2 seconds

doSomething(seqNo);  //3-4 seconds
seqNo = getNextSequenceFromSomeSource(oldSeq: seqNo) //1-2 seconds
storeCheckpointForTask (seqNo);
while sequence != null

如果您的任务在气流工人上运行,您有两个选择:

  • 您可以使用外部存储系统作为检查点存储(例如S3):
  • 或使用气流元数据(气流DB)作为检查点存储保存一个xcom
def my_task_func(**context):
seqNo = context["ti"].xcom_pull(key=f"checkpoint_{context['execution_date']}", default=None)
while True:
if not seqNo:
seqNo = getFirstSequenceFromSomeSource()
doSomething(seqNo)
seqNo = getNextSequenceFromSomeSource(seqNo)
if not seqNo:
break
context["ti"].xcom_push(key=f"checkpoint_{context['execution_date']}", seqNo)

如果您的任务在气流外运行,则第一个选项仍然有效,但第二个选项无效,并且您将有其他选项取决于您如何运行任务(docker的卷,K8S的PVC,…)。

最新更新