气流芹菜工人崩溃,无法完成任务



我已经设置了一个正在运行的 docker 环境:

  • 气流网络服务器
  • 气流调度程序
  • 2 个气流工作器(尽管仅使用 1 个工人即可重现该问题)
  • 雷迪斯

在单个 ECS 集群中具有 db.t2.micro postgresql RDS 实例的 4 个 t2.small EC2 实例上总共有 6 个映像。

使用 CeleryExecutor,几乎所有发送给工作人员的排队任务都会失败。在收到任务时,工作线程似乎失去了彼此和/或调度程序的通信 - 他们渐行渐远,错过心跳,最终被主机系统强行杀死。

我能够使用最新版本的 Redis 和 RabbitMQ 以及 Celery 4.3.0 在 Airflow 1.10.3(和最新的 1.10.4RC)上重现此行为。

我填充了建议的配置选项,包括:

  • scheduler__scheduler_heartbeat_sec(目前为 180 秒)
  • scheduler__job_heartbeat_sec(当前默认为 5 秒)
  • scheduler__max_threads(目前只有 1 个线程)
  • celery_broker_transport_options__visibility_timeout(目前为 21600 秒)

下面是一个 DAG 运行,它运行 5 个跨架构设置权限的 SQL 查询。

  • 手动运行这些查询只需几秒钟
  • 非码头化环境中的本地执行器将在 ~30 秒内运行 DAG。
  • 在这个新的 docker 环境中,CeleryExecutor 仍在尝试运行每个任务的第一次尝试 ~300 秒。

调度:


[2019-07-29 01:20:23,407] {{jobs.py:1106}} INFO - 5 tasks up for execution:
<TaskInstance: ldw_reset_permissions.service_readers 2019-07-29 01:20:17.300679+00:00 [scheduled]>
<TaskInstance: ldw_reset_permissions.marketing_readers 2019-07-29 01:20:17.300679+00:00 [scheduled]>
<TaskInstance: ldw_reset_permissions.finance_readers 2019-07-29 01:20:17.300679+00:00 [scheduled]>
<TaskInstance: ldw_reset_permissions.engineering_readers 2019-07-29 01:20:17.300679+00:00 [scheduled]>
<TaskInstance: ldw_reset_permissions.bi_readers 2019-07-29 01:20:17.300679+00:00 [scheduled]>
[2019-07-29 01:20:23,414] {{jobs.py:1144}} INFO - Figuring out tasks to run in Pool(name=None) with 128 open slots and 5 task instances in queue
[2019-07-29 01:20:23,418] {{jobs.py:1182}} INFO - DAG ldw_reset_permissions has 0/16 running and queued tasks
[2019-07-29 01:20:23,418] {{jobs.py:1182}} INFO - DAG ldw_reset_permissions has 1/16 running and queued tasks
[2019-07-29 01:20:23,418] {{jobs.py:1182}} INFO - DAG ldw_reset_permissions has 2/16 running and queued tasks
[2019-07-29 01:20:23,422] {{jobs.py:1182}} INFO - DAG ldw_reset_permissions has 3/16 running and queued tasks
[2019-07-29 01:20:23,422] {{jobs.py:1182}} INFO - DAG ldw_reset_permissions has 4/16 running and queued tasks
[2019-07-29 01:20:23,423] {{jobs.py:1223}} INFO - Setting the follow tasks to queued state:
<TaskInstance: ldw_reset_permissions.service_readers 2019-07-29 01:20:17.300679+00:00 [scheduled]>
<TaskInstance: ldw_reset_permissions.marketing_readers 2019-07-29 01:20:17.300679+00:00 [scheduled]>
<TaskInstance: ldw_reset_permissions.finance_readers 2019-07-29 01:20:17.300679+00:00 [scheduled]>
<TaskInstance: ldw_reset_permissions.engineering_readers 2019-07-29 01:20:17.300679+00:00 [scheduled]>
<TaskInstance: ldw_reset_permissions.bi_readers 2019-07-29 01:20:17.300679+00:00 [scheduled]>
[2019-07-29 01:20:23,440] {{jobs.py:1298}} INFO - Setting the following 5 tasks to queued state:
<TaskInstance: ldw_reset_permissions.service_readers 2019-07-29 01:20:17.300679+00:00 [queued]>
<TaskInstance: ldw_reset_permissions.marketing_readers 2019-07-29 01:20:17.300679+00:00 [queued]>
<TaskInstance: ldw_reset_permissions.finance_readers 2019-07-29 01:20:17.300679+00:00 [queued]>
<TaskInstance: ldw_reset_permissions.engineering_readers 2019-07-29 01:20:17.300679+00:00 [queued]>
<TaskInstance: ldw_reset_permissions.bi_readers 2019-07-29 01:20:17.300679+00:00 [queued]>
[2019-07-29 01:20:23,440] {{jobs.py:1334}} INFO - Sending ('ldw_reset_permissions', 'service_readers', datetime.datetime(2019, 7, 29, 1, 20, 17, 300679, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 1 and queue default
[2019-07-29 01:20:23,444] {{base_executor.py:59}} INFO - Adding to queue: ['airflow', 'run', 'ldw_reset_permissions', 'service_readers', '2019-07-29T01:20:17.300679+00:00', '--local', '-sd', '/usr/local/airflow/dags/ldw_reset_permissions.py']
[2019-07-29 01:20:23,445] {{jobs.py:1334}} INFO - Sending ('ldw_reset_permissions', 'marketing_readers', datetime.datetime(2019, 7, 29, 1, 20, 17, 300679, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 1 and queue default
[2019-07-29 01:20:23,446] {{base_executor.py:59}} INFO - Adding to queue: ['airflow', 'run', 'ldw_reset_permissions', 'marketing_readers', '2019-07-29T01:20:17.300679+00:00', '--local', '-sd', '/usr/local/airflow/dags/ldw_reset_permissions.py']
[2019-07-29 01:20:23,446] {{jobs.py:1334}} INFO - Sending ('ldw_reset_permissions', 'finance_readers', datetime.datetime(2019, 7, 29, 1, 20, 17, 300679, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 1 and queue default
[2019-07-29 01:20:23,446] {{base_executor.py:59}} INFO - Adding to queue: ['airflow', 'run', 'ldw_reset_permissions', 'finance_readers', '2019-07-29T01:20:17.300679+00:00', '--local', '-sd', '/usr/local/airflow/dags/ldw_reset_permissions.py']
[2019-07-29 01:20:23,446] {{jobs.py:1334}} INFO - Sending ('ldw_reset_permissions', 'engineering_readers', datetime.datetime(2019, 7, 29, 1, 20, 17, 300679, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 1 and queue default
[2019-07-29 01:20:23,447] {{base_executor.py:59}} INFO - Adding to queue: ['airflow', 'run', 'ldw_reset_permissions', 'engineering_readers', '2019-07-29T01:20:17.300679+00:00', '--local', '-sd', '/usr/local/airflow/dags/ldw_reset_permissions.py']
[2019-07-29 01:20:23,447] {{jobs.py:1334}} INFO - Sending ('ldw_reset_permissions', 'bi_readers', datetime.datetime(2019, 7, 29, 1, 20, 17, 300679, tzinfo=<TimezoneInfo [UTC, GMT, +00:00:00, STD]>), 1) to executor with priority 1 and queue default
[2019-07-29 01:20:23,447] {{base_executor.py:59}} INFO - Adding to queue: ['airflow', 'run', 'ldw_reset_permissions', 'bi_readers', '2019-07-29T01:20:17.300679+00:00', '--local', '-sd', '/usr/local/airflow/dags/ldw_reset_permissions.py']
[2019-07-29 01:21:25,589] {{jobs.py:1468}} INFO - Executor reports execution of ldw_reset_permissions.marketing_readers execution_date=2019-07-29 01:20:17.300679+00:00 exited with status failed for try_number 1
[2019-07-29 01:21:25,599] {{jobs.py:1468}} INFO - Executor reports execution of ldw_reset_permissions.engineering_readers execution_date=2019-07-29 01:20:17.300679+00:00 exited with status failed for try_number 1
[2019-07-29 01:21:56,111] {{jobs.py:1468}} INFO - Executor reports execution of ldw_reset_permissions.service_readers execution_date=2019-07-29 01:20:17.300679+00:00 exited with status failed for try_number 1
[2019-07-29 01:22:28,133] {{jobs.py:1468}} INFO - Executor reports execution of ldw_reset_permissions.bi_readers execution_date=2019-07-29 01:20:17.300679+00:00 exited with status failed for try_number 1

工作人员 1:

[2019-07-29 01:20:23,593: INFO/MainProcess] Received task: airflow.executors.celery_executor.execute_command[cb066498-e350-43c1-a23d-1bc33929717a]
[2019-07-29 01:20:23,605: INFO/ForkPoolWorker-15] Executing command in Celery: ['airflow', 'run', 'ldw_reset_permissions', 'service_readers', '2019-07-29T01:20:17.300679+00:00', '--local', '-sd', '/usr/local/airflow/dags/ldw_reset_permissions.py']
[2019-07-29 01:20:23,627: INFO/MainProcess] Received task: airflow.executors.celery_executor.execute_command[d835c30a-e2bd-4f78-b291-d19b7bccad68]
[2019-07-29 01:20:23,637: INFO/ForkPoolWorker-1] Executing command in Celery: ['airflow', 'run', 'ldw_reset_permissions', 'finance_readers', '2019-07-29T01:20:17.300679+00:00', '--local', '-sd', '/usr/local/airflow/dags/ldw_reset_permissions.py']
[2019-07-29 01:20:25,260] {{settings.py:182}} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=44
[2019-07-29 01:20:25,263] {{settings.py:182}} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=45
[2019-07-29 01:20:25,878] {{__init__.py:51}} INFO - Using executor CeleryExecutor
[2019-07-29 01:20:25,881] {{__init__.py:51}} INFO - Using executor CeleryExecutor
[2019-07-29 01:20:26,271] {{__init__.py:305}} INFO - Filling up the DagBag from /usr/local/airflow/dags/ldw_reset_permissions.py
[2019-07-29 01:20:26,276] {{__init__.py:305}} INFO - Filling up the DagBag from /usr/local/airflow/dags/ldw_reset_permissions.py
[2019-07-29 01:20:26,601] {{cli.py:517}} INFO - Running <TaskInstance: ldw_reset_permissions.finance_readers 2019-07-29T01:20:17.300679+00:00 [queued]> on host b4b0a799a7ca
[2019-07-29 01:20:26,604] {{cli.py:517}} INFO - Running <TaskInstance: ldw_reset_permissions.service_readers 2019-07-29T01:20:17.300679+00:00 [queued]> on host b4b0a799a7ca
[2019-07-29 01:20:39,364: INFO/MainProcess] missed heartbeat from celery@0f9db941bdd7
[2019-07-29 01:21:46,121: WARNING/MainProcess] Substantial drift from celery@0f9db941bdd7 may mean clocks are out of sync.  Current drift is
70 seconds.  [orig: 2019-07-29 01:21:46.117058 recv: 2019-07-29 01:20:36.485961]
[2019-07-29 01:21:46,127: ERROR/MainProcess] Process 'ForkPoolWorker-15' pid:42 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:21:46,294: ERROR/MainProcess] Task handler raised error: WorkerLostError('Worker exited prematurely: signal 9 (SIGKILL).',)
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/billiard/pool.py", line 1223, in mark_as_worker_lost
human_status(exitcode)),
billiard.exceptions.WorkerLostError: Worker exited prematurely: signal 9 (SIGKILL).
[2019-07-29 01:21:49,853: ERROR/MainProcess] Process 'ForkPoolWorker-17' pid:62 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:22:29,230: ERROR/MainProcess] Process 'ForkPoolWorker-18' pid:63 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:22:44,002: INFO/MainProcess] missed heartbeat from celery@0f9db941bdd7
[2019-07-29 01:22:52,073: ERROR/MainProcess] Process 'ForkPoolWorker-19' pid:64 exited with 'signal 9 (SIGKILL)'

工作人员 2:

[2019-07-29 01:20:23,605: INFO/MainProcess] Received task: airflow.executors.celery_executor.execute_command[dbb9b813-255e-4284-b067-22b990d8b9a2]
[2019-07-29 01:20:23,609: INFO/ForkPoolWorker-15] Executing command in Celery: ['airflow', 'run', 'ldw_reset_permissions', 'marketing_readers', '2019-07-29T01:20:17.300679+00:00', '--local', '-sd', '/usr/local/airflow/dags/ldw_reset_permissions.py']
[2019-07-29 01:20:23,616: INFO/MainProcess] Received task: airflow.executors.celery_executor.execute_command[42ee3e3a-620e-47da-add2-e5678973d87e]
[2019-07-29 01:20:23,622: INFO/ForkPoolWorker-1] Executing command in Celery: ['airflow', 'run', 'ldw_reset_permissions', 'engineering_readers', '2019-07-29T01:20:17.300679+00:00', '--local', '-sd', '/usr/local/airflow/dags/ldw_reset_permissions.py']
[2019-07-29 01:20:23,632: INFO/MainProcess] Received task: airflow.executors.celery_executor.execute_command[be609901-60bc-4dcc-9374-7c802171f2db]
[2019-07-29 01:20:23,638: INFO/ForkPoolWorker-3] Executing command in Celery: ['airflow', 'run', 'ldw_reset_permissions', 'bi_readers', '2019-07-29T01:20:17.300679+00:00', '--local', '-sd', '/usr/local/airflow/dags/ldw_reset_permissions.py']
[2019-07-29 01:20:26,124] {{settings.py:182}} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=45
[2019-07-29 01:20:26,127] {{settings.py:182}} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=46
[2019-07-29 01:20:26,135] {{settings.py:182}} INFO - settings.configure_orm(): Using pool settings. pool_size=5, pool_recycle=1800, pid=44
[2019-07-29 01:20:27,025] {{__init__.py:51}} INFO - Using executor CeleryExecutor
[2019-07-29 01:20:27,033] {{__init__.py:51}} INFO - Using executor CeleryExecutor
[2019-07-29 01:20:27,047] {{__init__.py:51}} INFO - Using executor CeleryExecutor
[2019-07-29 01:20:27,798] {{__init__.py:305}} INFO - Filling up the DagBag from /usr/local/airflow/dags/ldw_reset_permissions.py
[2019-07-29 01:20:27,801] {{__init__.py:305}} INFO - Filling up the DagBag from /usr/local/airflow/dags/ldw_reset_permissions.py
[2019-07-29 01:20:27,806] {{__init__.py:305}} INFO - Filling up the DagBag from /usr/local/airflow/dags/ldw_reset_permissions.py
[2019-07-29 01:20:28,426] {{cli.py:517}} INFO - Running <TaskInstance: ldw_reset_permissions.engineering_readers 2019-07-29T01:20:17.300679+00:00 [queued]> on host 0f9db941bdd7
[2019-07-29 01:20:28,426] {{cli.py:517}} INFO - Running <TaskInstance: ldw_reset_permissions.marketing_readers 2019-07-29T01:20:17.300679+00:00 [queued]> on host 0f9db941bdd7
[2019-07-29 01:20:28,437] {{cli.py:517}} INFO - Running <TaskInstance: ldw_reset_permissions.bi_readers 2019-07-29T01:20:17.300679+00:00 [queued]> on host 0f9db941bdd7
[2019-07-29 01:20:56,752: INFO/MainProcess] missed heartbeat from celery@b4b0a799a7ca
[2019-07-29 01:20:56,764: ERROR/MainProcess] Process 'ForkPoolWorker-15' pid:42 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:20:56,903: ERROR/MainProcess] Task handler raised error: WorkerLostError('Worker exited prematurely: signal 9 (SIGKILL).',)
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/billiard/pool.py", line 1223, in mark_as_worker_lost
human_status(exitcode)),
billiard.exceptions.WorkerLostError: Worker exited prematurely: signal 9 (SIGKILL).
[2019-07-29 01:20:57,623: WARNING/MainProcess] Substantial drift from celery@b4b0a799a7ca may mean clocks are out of sync.  Current drift is
25 seconds.  [orig: 2019-07-29 01:20:57.622959 recv: 2019-07-29 01:20:32.629294]
[2019-07-29 01:20:57,631: ERROR/MainProcess] Process 'ForkPoolWorker-1' pid:24 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:20:57,837: ERROR/MainProcess] Task handler raised error: WorkerLostError('Worker exited prematurely: signal 9 (SIGKILL).',)
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/billiard/pool.py", line 1223, in mark_as_worker_lost
human_status(exitcode)),
billiard.exceptions.WorkerLostError: Worker exited prematurely: signal 9 (SIGKILL).
[2019-07-29 01:20:58,513: ERROR/MainProcess] Process 'ForkPoolWorker-17' pid:65 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:22:23,076: INFO/MainProcess] missed heartbeat from celery@b4b0a799a7ca
[2019-07-29 01:22:23,089: ERROR/MainProcess] Process 'ForkPoolWorker-19' pid:67 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:22:23,105: ERROR/MainProcess] Process 'ForkPoolWorker-18' pid:66 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:22:23,116: ERROR/MainProcess] Process 'ForkPoolWorker-3' pid:26 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:22:23,191: ERROR/MainProcess] Task handler raised error: WorkerLostError('Worker exited prematurely: signal 9 (SIGKILL).',)
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/billiard/pool.py", line 1223, in mark_as_worker_lost
human_status(exitcode)),
billiard.exceptions.WorkerLostError: Worker exited prematurely: signal 9 (SIGKILL).
[2019-07-29 01:22:26,758: ERROR/MainProcess] Process 'ForkPoolWorker-22' pid:70 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:22:26,770: ERROR/MainProcess] Process 'ForkPoolWorker-21' pid:69 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:22:26,781: ERROR/MainProcess] Process 'ForkPoolWorker-20' pid:68 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:22:29,988: WARNING/MainProcess] process with pid=65 already exited
[2019-07-29 01:22:29,991: ERROR/MainProcess] Process 'ForkPoolWorker-24' pid:75 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:22:30,002: ERROR/MainProcess] Process 'ForkPoolWorker-23' pid:71 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:22:30,017: ERROR/MainProcess] Process 'ForkPoolWorker-16' pid:43 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:23:14,202: INFO/MainProcess] missed heartbeat from celery@b4b0a799a7ca
[2019-07-29 01:23:14,206: ERROR/MainProcess] Process 'ForkPoolWorker-28' pid:79 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:23:14,221: ERROR/MainProcess] Process 'ForkPoolWorker-27' pid:78 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:23:14,231: ERROR/MainProcess] Process 'ForkPoolWorker-26' pid:77 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:23:14,242: ERROR/MainProcess] Process 'ForkPoolWorker-25' pid:76 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:23:14,252: ERROR/MainProcess] Process 'ForkPoolWorker-14' pid:41 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:23:19,503: ERROR/MainProcess] Process 'ForkPoolWorker-33' pid:87 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:23:19,572: ERROR/MainProcess] Process 'ForkPoolWorker-32' pid:86 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:23:19,622: ERROR/MainProcess] Process 'ForkPoolWorker-31' pid:85 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:23:19,646: ERROR/MainProcess] Process 'ForkPoolWorker-30' pid:84 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:23:19,828: ERROR/MainProcess] Process 'ForkPoolWorker-29' pid:83 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:23:43,361: WARNING/MainProcess] process with pid=84 already exited
[2019-07-29 01:23:43,723: ERROR/MainProcess] Process 'ForkPoolWorker-38' pid:92 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:23:44,119: ERROR/MainProcess] Process 'ForkPoolWorker-37' pid:91 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:23:44,536: ERROR/MainProcess] Process 'ForkPoolWorker-36' pid:90 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:23:45,203: ERROR/MainProcess] Process 'ForkPoolWorker-35' pid:89 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:23:45,510: ERROR/MainProcess] Process 'ForkPoolWorker-34' pid:88 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:24:10,947: WARNING/MainProcess] process with pid=68 already exited
[2019-07-29 01:24:11,579: ERROR/MainProcess] Process 'ForkPoolWorker-43' pid:97 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:24:12,288: ERROR/MainProcess] Process 'ForkPoolWorker-42' pid:96 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:24:13,880: ERROR/MainProcess] Process 'ForkPoolWorker-41' pid:95 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:24:14,775: ERROR/MainProcess] Process 'ForkPoolWorker-40' pid:94 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:24:15,762: ERROR/MainProcess] Process 'ForkPoolWorker-39' pid:93 exited with 'signal 9 (SIGKILL)'
[2019-07-29 01:25:05,623: WARNING/MainProcess] process with pid=75 already exited

关于正在发生的事情以及如何解决此问题的建议?

事实证明,这是一个与 AWS ECS 相关的问题,而不是与 Airflow 配置本身有关的问题。

通过 htop 进行更多的测试和监控,我注意到无论工作节点数量如何,一个工作节点总是会不断加剧 CPU,直到系统杀死它,如上面的日志所示。

Airflow 工作线程的容器/任务定义没有明确设置 CPU 单元,假设它们将自动管理,因为它不是必填字段。

指定足够的 CPU 单元,以便将每个工作器容器/任务分发到其自己的 EC2 实例,就可以解决问题。

相关内容

  • 没有找到相关文章

最新更新