气流DAG卡在运行状态



我创建了一个dag,并每天安排它。它每天都会排队,但任务实际上并没有运行。这个问题过去已经在这里提出了,但答案对我没有帮助,所以似乎还有另一个问题。

下面分享了我的代码。我用注释替换了任务t2的SQL。当我使用"气流测试…"在CLI上单独运行它们时,每个任务都能成功运行。

你能解释一下应该做些什么来运行DAG吗?谢谢

这是DAG代码:

from datetime import timedelta, datetime
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator

default_args = {
'owner' : 'me',
'depends_on_past' : 'true',
'start_date' : datetime(2018, 06, 25),
'email' : ['myemail@moovit.com'],
'email_on_failure':True,
'email_on_retry':False,
'retries' : 2,
'retry_delay' : timedelta(minutes=5)
}

dag = DAG('my_agg_table',
default_args = default_args,
schedule_interval = "30 4 * * *"
)

t1 = BigQueryOperator(
task_id='bq_delete_my_agg_table',
use_legacy_sql=False,
write_disposition='WRITE_TRUNCATE',
allow_large_results=True,
bql='''
delete `my_project.agg.my_agg_table`
where date = '{{ macros.ds_add(ds, -1)}}'
''',
dag=dag)
t2 = BigQueryOperator(
task_id='bq_insert_my_agg_table',
use_legacy_sql=False,
write_disposition='WRITE_APPEND',
allow_large_results=True,
bql='''
#standardSQL
Select ... the query continue here.....
''',    destination_dataset_table='my_project.agg.my_agg_table',
dag=dag)

t1 >> t2

通常很容易找到任务未运行的原因。在气流web UI中时:

  • 选择任何感兴趣的DAG
  • 现在单击任务
  • 再次单击Task Instance Details
  • 在第一行有一个面板Task Instance State
  • 在旁边的框Reason中,是运行任务的原因,或者忽略任务的原因

检查第一个没有执行的任务通常是有意义的,因为我看到您设置了depends_on_past=True,如果在错误的场景中使用,可能会导致问题。

更多信息请点击此处:Airflow 1.9.0正在排队,但没有启动任务

最新更新