使用 ExternalTaskSensor for DAG 任务依赖关系时运行缓慢的气流 1.10.2 ETL?



我有两个DAG需要与Airflow 1.10.2 + CeleryExecutor一起运行。第一个 DAG (DAG1) 是从 s3 到 Redshift (3+ 小时)的长时间运行的数据加载。我的第二个 DAG (DAG2) 对 DAG1 加载的数据执行计算。我想在DAG2中包含ExternalTaskSensor,以便在数据加载后可靠地执行计算。理论上这么简单!

我可以通过确保两个 DAG 计划同时启动(两个 DAG 的 schedule="0 8 * * *")并且 DAG2 依赖于 DAG1 中的最终任务,成功地让 DAG2 等待 DAG1 完成。但是当我引入传感器时,我看到我们在 DAG1 上的 ETL 出现了巨大的延迟。起初,这是因为我的原始实现使用了mode="poke",据我了解,这会锁定工作线程。但是,即使我在文档中阅读 https://airflow.readthedocs.io/en/stable/_modules/airflow/sensors/base_sensor_operator.html 将其更改为mode="reschedule",我仍然看到大量的 ETL 延迟。

我在DAG2中使用下面的ExternalTaskSensor代码:

wait_for_data_load = ExternalTaskSensor(
dag=dag,
task_id="wait_for_data_load",
external_dag_id="dag1",
external_task_id="dag1_final_task_id",
mode="reschedule",
poke_interval=1800,  # check every 30 min
timeout=43200,  # timeout after 12 hours (catch delayed data load runs)
soft_fail=False  # if the task fails, we assume a failure
)

如果代码工作正常,我希望传感器执行快速检查 DAG1 是否已完成,如果没有,则按照poke_interval定义重新安排 30 分钟的时间,不会延迟 DAG1 ETL。如果 DAG1 在 12 小时后无法完成,则 DAG2 将停止戳并失败。

相反,我经常收到 DAG1 中每个任务的错误,说(例如)Executor reports task instance <TaskInstance: dag1.data_table_temp_redshift_load 2019-05-20 08:00:00+00:00 [queued]> finished (failed) although the task says its queued. Was the task killed externally?即使任务成功完成(有一些延迟)。就在发送此错误之前,我在哨兵日志中看到一行Executor reports dag1.data_table_temp_redshift_load execution_date=2019-05-20 08:00:00+00:00 as failed for try_number 1尽管(再次)我可以看到任务成功。

DAG2上的日志看起来也有点奇怪。我看到以相同的时间间隔记录的重复尝试,如下所述:

--------------------------------------------------------------------------------
Starting attempt 1 of 4
--------------------------------------------------------------------------------
[2019-05-21 08:01:48,417] {{models.py:1593}} INFO - Executing <Task(ExternalTaskSensor): wait_for_data_load> on 2019-05-20T08:00:00+00:00
[2019-05-21 08:01:48,419] {{base_task_runner.py:118}} INFO - Running: ['bash', '-c', 'airflow run dag2 wait_for_data_load 2019-05-20T08:00:00+00:00 --job_id 572075 --raw -sd DAGS_FOLDER/dag2.py --cfg_path /tmp/tmp4g2_27c7']
[2019-05-21 08:02:02,543] {{base_task_runner.py:101}} INFO - Job 572075: Subtask wait_for_data_load [2019-05-21 08:02:02,542] {{settings.py:174}} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=28219
[2019-05-21 08:02:12,000] {{base_task_runner.py:101}} INFO - Job 572075: Subtask wait_for_data_load [2019-05-21 08:02:11,996] {{__init__.py:51}} INFO - Using executor CeleryExecutor
[2019-05-21 08:02:15,840] {{base_task_runner.py:101}} INFO - Job 572075: Subtask wait_for_data_load [2019-05-21 08:02:15,827] {{models.py:273}} INFO - Filling up the DagBag from /usr/local/airflow/dags/dag2.py
[2019-05-21 08:02:16,746] {{base_task_runner.py:101}} INFO - Job 572075: Subtask wait_for_data_load [2019-05-21 08:02:16,745] {{dag2.py:40}} INFO - Waiting for the dag1_final_task_id operator to complete in the dag1 DAG
[2019-05-21 08:02:17,199] {{base_task_runner.py:101}} INFO - Job 572075: Subtask wait_for_data_load [2019-05-21 08:02:17,198] {{cli.py:520}} INFO - Running <TaskInstance: dag1. wait_for_data_load 2019-05-20T08:00:00+00:00 [running]> on host 11d93b0b0c2d
[2019-05-21 08:02:17,708] {{external_task_sensor.py:91}} INFO - Poking for dag1. dag1_final_task_id on 2019-05-20T08:00:00+00:00 ... 
[2019-05-21 08:02:17,890] {{models.py:1784}} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2019-05-21 08:02:17,892] {{base_task_runner.py:101}} INFO - Job 572075: Subtask wait_for_data_load /usr/local/lib/python3.6/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.25.2) or chardet (3.0.4) doesn't match a supported version!
[2019-05-21 08:02:17,893] {{base_task_runner.py:101}} INFO - Job 572075: Subtask wait_for_data_load   RequestsDependencyWarning)
[2019-05-21 08:02:17,893] {{base_task_runner.py:101}} INFO - Job 572075: Subtask wait_for_data_load /usr/local/lib/python3.6/site-packages/psycopg2/__init__.py:144: UserWarning: The psycopg2 wheel package will be renamed from release 2.8; in order to keep installing from binary please use "pip install psycopg2-binary" instead. For details see: <http://initd.org/psycopg/docs/install.html#binary-install-from-pypi>.
[2019-05-21 08:02:17,894] {{base_task_runner.py:101}} INFO - Job 572075: Subtask wait_for_data_load   """)
[2019-05-21 08:02:22,597] {{logging_mixin.py:95}} INFO - [2019-05-21 08:02:22,589] {{jobs.py:2527}} INFO - Task exited with return code 0
[2019-05-21 08:01:48,125] {{models.py:1359}} INFO - Dependencies all met for <TaskInstance: dag2. wait_for_data_load 2019-05-20T08:00:00+00:00 [queued]>
[2019-05-21 08:01:48,311] {{models.py:1359}} INFO - Dependencies all met for <TaskInstance: dag2. wait_for_data_load 2019-05-20T08:00:00+00:00 [queued]>
[2019-05-21 08:01:48,311] {{models.py:1571}} INFO - 
--------------------------------------------------------------------------------
Starting attempt 1 of 4
--------------------------------------------------------------------------------
[2019-05-21 08:01:48,417] {{models.py:1593}} INFO - Executing <Task(ExternalTaskSensor): wait_for_data_load> on 2019-05-20T08:00:00+00:00
[2019-05-21 08:01:48,419] {{base_task_runner.py:118}} INFO - Running: ['bash', '-c', 'airflow run dag2 wait_for_data_load 2019-05-20T08:00:00+00:00 --job_id 572075 --raw -sd DAGS_FOLDER/dag2.py --cfg_path /tmp/tmp4g2_27c7']
[2019-05-21 08:02:02,543] {{base_task_runner.py:101}} INFO - Job 572075: Subtask wait_for_data_load [2019-05-21 08:02:02,542] {{settings.py:174}} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=28219
[2019-05-21 08:02:12,000] {{base_task_runner.py:101}} INFO - Job 572075: Subtask wait_for_data_load [2019-05-21 08:02:11,996] {{__init__.py:51}} INFO - Using executor CeleryExecutor
[2019-05-21 08:02:15,840] {{base_task_runner.py:101}} INFO - Job 572075: Subtask wait_for_data_load [2019-05-21 08:02:15,827] {{models.py:273}} INFO - Filling up the DagBag from /usr/local/airflow/dags/dag2.py
[2019-05-21 08:02:16,746] {{base_task_runner.py:101}} INFO - Job 572075: Subtask wait_for_data_load [2019-05-21 08:02:16,745] {{dag2.py:40}} INFO - Waiting for the dag1_final_task_id operator to complete in the dag1 DAG
[2019-05-21 08:02:17,199] {{base_task_runner.py:101}} INFO - Job 572075: Subtask wait_for_data_load [2019-05-21 08:02:17,198] {{cli.py:520}} INFO - Running <TaskInstance: dag2.wait_for_data_load 2019-05-20T08:00:00+00:00 [running]> on host 11d93b0b0c2d
[2019-05-21 08:02:17,708] {{external_task_sensor.py:91}} INFO - Poking for dag1.dag1_final_task_id on 2019-05-20T08:00:00+00:00 ... 
[2019-05-21 08:02:17,890] {{models.py:1784}} INFO - Rescheduling task, marking task as UP_FOR_RESCHEDULE
[2019-05-21 08:02:17,892] {{base_task_runner.py:101}} INFO - Job 572075: Subtask wait_for_data_load /usr/local/lib/python3.6/site-packages/requests/__init__.py:91: RequestsDependencyWarning: urllib3 (1.25.2) or chardet (3.0.4) doesn't match a supported version!
[2019-05-21 08:02:17,893] {{base_task_runner.py:101}} INFO - Job 572075: Subtask wait_for_data_load   RequestsDependencyWarning)
[2019-05-21 08:02:17,893] {{base_task_runner.py:101}} INFO - Job 572075: Subtask wait_for_data_load /usr/local/lib/python3.6/site-packages/psycopg2/__init__.py:144: UserWarning: The psycopg2 wheel package will be renamed from release 2.8; in order to keep installing from binary please use "pip install psycopg2-binary" instead. For details see: <http://initd.org/psycopg/docs/install.html#binary-install-from-pypi>.
[2019-05-21 08:02:17,894] {{base_task_runner.py:101}} INFO - Job 572075: Subtask wait_for_data_load   """)
[2019-05-21 08:02:22,597] {{logging_mixin.py:95}} INFO - [2019-05-21 08:02:22,589] {{jobs.py:2527}} INFO - Task exited with return code 0
[2019-05-21 08:33:31,875] {{models.py:1359}} INFO - Dependencies all met for <TaskInstance: dag2.wait_for_data_load 2019-05-20T08:00:00+00:00 [queued]>
[2019-05-21 08:33:31,903] {{models.py:1359}} INFO - Dependencies all met for <TaskInstance: dag2.wait_for_data_load 2019-05-20T08:00:00+00:00 [queued]>
[2019-05-21 08:33:31,903] {{models.py:1571}} INFO - 
--------------------------------------------------------------------------------
Starting attempt 1 of 4
--------------------------------------------------------------------------------

虽然所有日志都说Starting attempt 1 of 4,但我确实看到大约每 30 分钟一次的尝试记录,但我看到每个时间间隔有多个日志(每 10 分钟间隔打印 30+ 个相同的日志)。

通过搜索,我看到其他人正在生产流程中使用传感器 https://eng.lyft.com/running-apache-airflow-at-lyft-6e53bb8fccff,这让我认为有办法解决这个问题,或者我正在实施错误的东西。但是我也在气流项目中看到了与此问题相关的未决问题,所以也许项目中还有更深层次的问题?我还在这里找到了一个相关但未回答的帖子 Apache Airflow 1.10.3:执行器报告任务实例???已完成(失败),尽管任务显示其已排队。任务是否在外部被杀死?

此外,我们还使用以下配置设置:

# The amount of parallelism as a setting to the executor. This defines
# the max number of task instances that should run simultaneously
# on this airflow installation
parallelism = 32
# The number of task instances allowed to run concurrently by the scheduler
dag_concurrency = 16
# Are DAGs paused by default at creation
dags_are_paused_at_creation = True
# When not using pools, tasks are run in the "default pool",
# whose size is guided by this config element
non_pooled_task_slot_count = 128
# The maximum number of active DAG runs per DAG
max_active_runs_per_dag = 16

这些症状实际上是由 DAG1 正文中对Variable.set()的调用引起的,DAG2 随后使用该调用来检索动态生成的dag_id DAG1。Variable.set()都导致了错误(在工作线程日志中发现)。如此处所述,计划程序使用每个检测信号轮询 DAG 定义,以更新 DAG 以使 DAG 保持最新。这意味着每个心跳都会出错,从而导致较大的 ETL 延迟。

最新更新