Cloud Composer气流任务失败,但功能成功完成



我正在尝试使用Cloud Composer编写我的第一个气流作业。我的DAG有三个任务,第一个任务成功完成,但是第二个任务似乎在发出任何故障错误消息时失败了。我在第二个任务中使用PythonOperator。称为该功能执行长期运行查询和轮询,直到查询完成为止。查询完成后,我会收到一条消息,说数据已输出到正确的表格,但是气流将任务视为失败并再次重新检索任务。

我的 default_args for dag看起来像这样:

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': today.strftime("%Y-%m-%d"),
    'email': ['email@email.com'],
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=1),
    'dagrun_timeout': timedelta(minutes=30)
}

编辑:

这是我的Python可召唤和pythonoperator。run_query可呼叫可在StackDriver日志中产生输出,并指示实际功能完成,但任务失败。

def run_query(**kwargs):
    ti = kwargs['ti']
    creds = ti.xcom_pull(key='key value 1', task_ids=t1_id)
    service = adh.get_service(creds)
    return adh.start_saved_query(service,
                                 kwargs['customer_id'],
                                 kwargs['query_name'],
                                 kwargs['start_date'],
                                 kwargs['end_date'],
                                 kwargs['project'],
                                 kwargs['dataset'],
                                 kwargs['table'],
                                 parameters=kwargs['parameters'])
run_adh_query = PythonOperator(
    task_id="task2",
    provide_context=True,
    python_callable=run_query,
    dag=dag,
    trigger_rule='all_success',
    op_kwargs={
        'customer_id': 01234,
        'query_name': 'queryName',
        'start_date': start_date.strftime("%Y-%m-%d"),
        'end_date': end_date.strftime("%Y-%m-%d"),
        'project': adh_project,
        'dataset': adh_dataset,
        'table': adh_table,
        'parameters': {
        'CONV_START_DATE': {'value': conv_start_date.strftime("%Y-%m-%d")},
        'CONV_END_DATE': {'value': end_date.strftime("%Y-%m-%d")},
        'LOOKBACK_DAYS': {'value': str(lookback_days)}
        }
    }
)

我非常感谢任何提示!

我在GCP Cloud Composer [Composer-1.11.0-AirFlow-1.10.9]中面临相同的问题。

对于长期运行的任务,有很高的概率(尤其是在使用KuberNetespoderator时),可以将任务标记为AirFlow Scheduler。

解决方案: -

i将调度程序中的Scheduler_zombie_task_task_threshold配置参数值从300(默认5分钟)增加到1800(30分钟)。此后,我能够运行任务长达45分钟,而不会以失败状态结束。

如何更改参数 -

  1. 转到GCP Cloud Console。
  2. 导航到作曲家。
  3. 打开作曲家。
  4. 转到气流配置覆盖选项卡
  5. 输入值:Scheduler Scheduler_zombie_task_threshold 1800

我在您的代码中看不到任何错误处理。

长期运行查询和民意调查失败,提高AirFlowException,这将导致任务立即移动到故障状态。

from airflow import AirflowException

valueerror可用于失败和重试

最新更新