Apache气流中的Python变量不保存数据



下面的代码在终端中运行时运行正常:

data = []
def _step_one():
for i in range(10):
data.append(i)
def _step_two():
print(f'data: {data}')
_step_one()
_step_two()

打印data: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

但是当我在Apache气流上运行相同的data是空的[]作为第一次初始化

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago
from datetime import timedelta
data = []
def _step_one():
for i in range(10):
data.append(i)
def _step_two():
print(f'data: {data}')
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(1),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=1)
}
dag = DAG(
dag_id='test_dag',
default_args=default_args,
description='Test DAG',
schedule_interval=timedelta(minutes=3)
)
step_one = PythonOperator(
task_id='step_one',
python_callable=_step_one,
dag=dag
)
step_two = PythonOperator(
task_id='step_two',
python_callable=_step_two,
dag=dag
)
final_step = BashOperator(
task_id='notify',
bash_command='echo "Operation Completed!"',
dag=dag
)
step_one >> step_two >> final_step

我是Python的新手,所以我可能犯了一个新手错误,你知道我做错了什么吗?

不能用这种方式在任务之间传递数据。

由于您的任务可以在不同的机器上运行,因此它们不共享变量。如果你想在任务之间传递数据,你应该使用XCom:

https://airflow.apache.org/docs/apache-airflow/2.0.1/concepts.html xcoms

最新更新