当任务名称和任务 ID 不匹配时,为什么气流会引发异常?



从气流文档和阅读互联网上的不同站点来看,很明显,在使用运算符创建任务时,任务名称和task_id不需要匹配。例如,我使用以下代码:

from airflow import DAG
from airflow.utils import timezone
from airflow.operators.python_operator import PythonOperator
from pprint import pprint
default_args = {
'owner':'me',
'start_date': timezone.datetime(2019,1,1), 
'provide_context': True
}
dag = DAG(
'etl', # dag_id
default_args=default_args,
schedule_interval=None,
max_active_runs=2,
catchup=False
)
def some_function(**context):
start_time = default_args['start_date']
print(start_time )
def another_function(**context):
start_time = default_args['start_date']
print(start_time )
with dag:
some_task_name = PythonOperator(
task_id='some_task_name_id',
python_callable=some_function,
)
another_task_name = PythonOperator(
task_id='another_task_name_id',
python_callable=another_function,
)
some_task_name >> another_task_name

我正在使用以下命令进行测试:

气流测试etl_script some_task_name"参数">

并收到以下错误:

Traceback (most recent call last):
File "~/airflow/.venv/bin/airflow", line 32, in <module>
args.func(args)
File "~/airflow/.venv/lib/python3.6/site-packages/airflow/utils/cli.py", line 74, in wrapper
return f(*args, **kwargs)
File "~/airflow/.venv/lib/python3.6/site-packages/airflow/bin/cli.py", line 648, in test
task = dag.get_task(task_id=args.task_id)
File "~/airflow/.venv/lib/python3.6/site-packages/airflow/models/dag.py", line 1070, in get_task
raise AirflowException("Task {task_id} not found".format(task_id=task_id))
airflow.exceptions.AirflowException: Task some_task_name not found

但是如果我使任务名称和ID相同,它可以工作。为什么会这样?我觉得我错过了一些东西,但无法弄清楚。谁能洒一些光?

环境: 气流版本:1.10.5,Ubuntu 18.04,Python 3.6

发生错误不是因为task_id和名称不同,而是因为我使用任务名称而不是测试命令task_id。从服务器命令行测试 dag 任务时,命令布局为:

气流测试dag_id task_id日期

我想最好保持任务名称和ID相同。

最新更新