我定义了一个全局变量timestamp
,其中有多个操作符引用该变量。似乎这个变量是重新定义时,每个操作符运行?下面是一个最小的可复制示例。我期望test
和test2
将打印相同的时间戳,但它们在气流中打印不同的时间戳(间隔秒)。
import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
start_date = datetime.datetime(
year=2022,
month=3,
day=30,
hour=18,
minute=0,
)
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
def test():
print(timestamp)
def test2():
print(timestamp)
with DAG(
'airflow_test',
description='airflow test',
max_active_runs=1,
start_date=start_date,
) as dag:
test = PythonOperator(
task_id='test',
python_callable=test,
dag=dag
)
test2 = PythonOperator(
task_id='test2',
python_callable=test2,
dag=dag
)
test >> test2
当这个脚本由气流运行时,导致这种情况发生时,内部实际发生了什么?
我更喜欢使用默认内置的气流,如:
from airflow.utils.dates import days_ago
的例子:
project_cfg = {
'owner': 'airflow',
'email': ['your-email@example.com'],
'email_on_failure': True,
'start_date': days_ago(1),
'retries': 1,
'retry_delay': timedelta(hours=1),
}
对于你的问题,我更喜欢用xcom来引用你的全局变量另一种方法是创建一个函数并为每个任务调用它或创建一个局部变量
或者试试这个:
with DAG(dag_id="test",
start_date=days_ago(3),
schedule_interval="@daily",
catchup=False) as dag:
timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
@task
def test1():
return timestamp
@task
def test2():
return timestamp
test2(test1())
我希望它能帮助你。