dag.py 提出:"airflow.exceptions.AirflowException: Task is missing the start_date parameter",但它在代码中给出



我今天尝试创建我的第一个气流DAG:

from datetime import timedelta
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
default_args = {
'owner': 'default_user',
'start_date': days_ago(2),
'depends_on_past': True,
# With this set to true, the pipeline won't run if the previous day failed
'email': ['demo@email.de'],
'email_on_failure': True,
# upon failure this pipeline will send an email to your email set above
'email_on_retry': False,
'retries': 5,
'retry_delay': timedelta(minutes=30),
}
dag = DAG(
'basic_dag_2',
default_args=default_args,
schedule_interval=timedelta(days=1),
)

def my_func():
print('Hello from my_func')

bashtask = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag,
)
dummy_task = DummyOperator(task_id='dummy_task', retries=3)
python_task = PythonOperator(task_id='python_task', python_callable=my_func)
dummy_task.set_downstream(bashtask)
python_task.set_downstream(bashtask)

我的气流在 Python3.6.8 上正常运行,但是当我尝试将 dagbag 导入气流时,它会抛出此异常,我不知道为什么:

[2020-05-11 17:11:15,601] {scheduler_job.py:1576} WARNING - No viable dags retrieved from /root/airflow/dags/first_dag.py
[2020-05-11 17:11:15,616] {scheduler_job.py:162} INFO - Processing /root/airflow/dags/first_dag.py took 0.031 seconds
[2020-05-11 17:12:05,647] {scheduler_job.py:154} INFO - Started process (PID=26569) to work on /root/airflow/dags/first_dag.py
[2020-05-11 17:12:05,653] {scheduler_job.py:1562} INFO - Processing file /root/airflow/dags/first_dag.py for tasks to queue
[2020-05-11 17:12:05,654] {logging_mixin.py:112} INFO - [2020-05-11 17:12:05,654] {dagbag.py:396} INFO - Filling up the DagBag from /root/airflow/dags/first_dag.py
[2020-05-11 17:12:05,666] {logging_mixin.py:112} INFO - [2020-05-11 17:12:05,662] {dagbag.py:239} ERROR - Failed to import: /root/airflow/dags/first_dag.py
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/airflow/models/dagbag.py", line 236, in process_file
m = imp.load_source(mod_name, filepath)
File "/usr/lib64/python3.6/imp.py", line 172, in load_source
module = _load(spec)
File "<frozen importlib._bootstrap>", line 684, in _load
File "<frozen importlib._bootstrap>", line 665, in _load_unlocked
File "<frozen importlib._bootstrap_external>", line 678, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/root/airflow/dags/first_dag.py", line 34, in <module>
dag=dag,
File "/usr/local/lib/python3.6/site-packages/airflow/utils/decorators.py", line 98, in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/airflow/operators/bash_operator.py", line 70, in __init__
super(BashOperator, self).__init__(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/airflow/utils/decorators.py", line 98, in wrapper
result = func(*args, **kwargs)
File "/usr/local/lib/python3.6/site-packages/airflow/models/baseoperator.py", line 422, in __init__
self.dag = dag
File "/usr/local/lib/python3.6/site-packages/airflow/models/baseoperator.py", line 548, in dag
dag.add_task(self)
File "/usr/local/lib/python3.6/site-packages/airflow/models/dag.py", line 1301, in add_task
raise AirflowException("Task is missing the start_date parameter")
airflow.exceptions.AirflowException: Task is missing the start_date parameter

我认为我也应该给我的操作员一个start_date,但他们也应该使用其 DAG 中的日期。

这是因为您的两个任务尚未分配给包含default_args中的start_date的 DAG。

dummy_task = DummyOperator(task_id='dummy_task', retries=3, dag=dag)
python_task = PythonOperator(task_id='python_task', python_callable=my_func, dag=dag)

请注意,您可以使用 DAG 对象作为上下文管理器,如 https://airflow.apache.org/docs/stable/concepts.html#context-manager 中所述,以避免对所有任务重复dag=dag

例:

with DAG(
'basic_dag_2',
default_args=default_args,
schedule_interval=timedelta(days=1),
) as dag:
bashtask = BashOperator(
task_id='print_date',
bash_command='date',
)
dummy_task = DummyOperator(task_id='dummy_task', retries=3)
python_task = PythonOperator(task_id='python_task', python_callable=my_func)
dummy_task.set_downstream(bashtask)
python_task.set_downstream(bashtask)

有同样的问题,您只需要在您使用的每个运算符中放入dag=dag。 因为运算符仍然需要很少的参数才能作为任务运行,并且这些参数在 DAG 部分中定义,然后 TASK 才能运行。

举个例子: -这是错误的:

postgres_task_1 = PostgresOperator(
task_id="get_param_2",
postgres_conn_id="aramis_postgres_connection",
sql="""
SELECT  param_num_2 FROM public.aramis_meta_task
""",
)

-这是正确的:

postgres_task_1 = PostgresOperator(
dag=dag,
task_id="get_param_2",
postgres_conn_id="aramis_postgres_connection",
sql="""
SELECT  param_num_2 FROM public.aramis_meta_task
""",
)

最新更新