重新运行时气流是否缓存全局变量



>我有一个气流作业,如下所示:

import time
job_id = int(time.time())
airflow_job1 = PythonOperator(op_kwargs={"job_id" : job_id}, ...)
airflow_job2 = BashOperator(op_kwargs={"job_id" : job_id}, ...)
airflow_job1 >> airflow_job2

我知道每次脚本启动时,我都会有一个新job_id,用于每个气流任务。但是我想知道如果我从中间运行脚本,比如airflow_job1失败了,我修复了问题并从 UI 中的airflow_job1重新运行,是在重新运行中生成了新job_id,还是 Airflow 使用之前的最后一job_id?

实际上,在我用一个简单的案例检查之后:

# global parameter
job_id = int(time.time())

def airflow_job1(job_id, **context):
print("in airflow_job1, current timestamp: %s" % job_id)
def airflow_job2(job_id, **context):
print("in airflow_job2, current timestamp: %s" % job_id)
airflow_job1 = PythonOperator(
task_id='airflow_job1',
provide_context=True,
python_callable=airflow_job1,
op_kwargs={'job_id': job_id},
dag=globals()[dag_name]
)
airflow_job2 = PythonOperator(
task_id='airflow_job2',
provide_context=True,
python_callable=airflow_job2,
op_kwargs={'job_id': job_id},
dag=globals()[dag_name]
)
airflow_job1 >> airflow_job2

我发现job_id airflow_job1,即使在同一次运行中,airflow_job2也是不同的。

所以结论是,我们不应该以这种方式设置全局参数,也许用xcom_pull/xcom_push来解决

最新更新