如何使用docker-compose在分布式气流结构上配置cerceler-worker



我正在建立一个分布式Airflow集群,其中除了芹菜工作者之外的所有其他东西都在一台主机上运行,处理在多台主机上完成。Airflow 2.0设置使用Airflow文档中给出的yaml文件进行配置https://airflow.apache.org/docs/apache-airflow/stable/docker-compose.yaml。在最初的测试中,当我在同一台主机上运行所有东西时,我使体系结构能够很好地工作。问题是,如何在远程主机上启动芹菜工作者?

到目前为止,我试图创建一个上述docker compose的修剪版本,其中我只在worker主机上启动芹菜worker,而没有其他内容。但我在数据库连接方面遇到了一些问题。在修剪后的版本中,我更改了URL,使其指向运行db和redis的主机。

dags、日志、插件和postgresqldb位于所有主机都可见的共享驱动器上。

我应该如何进行配置?你知道该检查什么吗?连接等。?芹菜工人码头工人组成配置:

---
version: '3'
x-airflow-common:
&airflow-common
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.0}
environment:
&airflow-common-env
AIRFLOW_UID: 50000
AIRFLOW_GID: 50000
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN: 
postgresql+psycopg2://airflow:airflow@airflowhost.example.com:8080/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@airflow@airflowhost.example.com:8080/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@airflow@airflowhost.example.com:6380/0
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'true'
AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth'
REDIS_PORT: 6380
volumes:
- /airflow/dev/dags:/opt/airflow/dags
- /airflow/dev/logs:/opt/airflow/logs
- /airflow/dev/plugins:/opt/airflow/plugins
user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-50000}"
services:
airflow-remote-worker:
<<: *airflow-common
command: celery worker
healthcheck:
test:
- "CMD-SHELL"
- 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
interval: 10s
timeout: 10s
retries: 5
restart: always

编辑1:我在处理日志文件方面仍然有一些困难。共享日志目录似乎并不能解决日志文件丢失的问题。我在main上添加了extra_host定义,并打开了工作机器上的端口8793。工作任务失败,日志为:

*** Log file does not exist: 
/opt/airflow/logs/tutorial/print_date/2021-07- 
01T13:57:11.087882+00:00/1.log
*** Fetching from: http://:8793/log/tutorial/print_date/2021-07-01T13:57:11.087882+00:00/1.log
*** Failed to fetch log file from worker. Unsupported URL protocol ''

远不是;最终设置";,以下是一些对我有用的设置,这些设置使用了核心节点中Airflow的docker compose和Worker:

主节点:

  • 工作节点必须可以从运行Webserver的主节点访问。我发现这个CeleryExecutor体系结构图对解决问题非常有帮助。

    在尝试读取日志时,如果在本地找不到日志,它将尝试从远程工作线程检索日志。因此,您的主节点可能不知道工作者的主机名,因此您可以更改主机名的解析方式(hostname_callable设置,默认为socket.getfqdn),也可以简单地向Webserver添加名称解析功能。这可以通过在x-airflow-common定义中添加extra_hosts配置密钥来实现:

---
version: "3"
x-airflow-common: &airflow-common
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.0}
environment: &airflow-common-env
...# env vars
extra_hosts:
- "worker-01-hostname:worker-01-ip-address" # "worker-01-hostname:192.168.0.11"
- "worker-02-hostname:worker-02-ip-address"

*请注意,在您有共享驱动器的特定情况下,我认为日志将在本地找到

  • 定义并行DAG并发调度器解析进程。可以使用env-vars完成:
x-airflow-common: &airflow-common
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.0}
environment: &airflow-common-env
AIRFLOW__CORE__PARALLELISM: 64
AIRFLOW__CORE__DAG_CONCURRENCY: 32
AIRFLOW__SCHEDULER__PARSING_PROCESSES: 4

当然,要设置的值取决于您的具体案例和可用资源。这篇文章对这个主题有一个很好的概述DAG设置也可以在DAG定义中被覆盖。

工作节点:

  • 定义workerCELERY__WORKER_CONCURRENCY,默认值可以是机器上可用的CPU数量(文档)。

  • 定义如何访问在主节点中运行的服务。设置IP或主机名,并注意主节点中匹配的暴露端口:

x-airflow-common: &airflow-common
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.1.0}
environment: &airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CELERY__WORKER_CONCURRENCY: 8
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@main_node_ip_or_hostname:5432/airflow # 5432 is default postgres port
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@main_node_ip_or_hostname:5432/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@main_node_ip_or_hostname:6379/0
  • 共享相同的Fernet Key和Secret Key,从"中读取它们;。env";文件:
environment: &airflow-common-env
AIRFLOW__CORE__FERNET_KEY: ${FERNET_KEY}
AIRFLOW__WEBSERVER__SECRET_KEY: ${SECRET_KEY}
env_file:
- .env

.env文件:FERNET_KEY=jvYUaxxxxxxxxxxxxx=

  • 集群中的每个节点(主节点和辅助节点)都应用了相同的设置,这是关键

  • 为辅助服务定义主机名,以避免自动生成与容器id匹配的主机名。

  • 暴露端口8793,这是用于从工作者(文档)获取日志的默认端口:

services:
airflow-worker:
<<: *airflow-common
hostname: ${HOSTNAME}
ports:
- 8793:8793
command: celery worker
restart: always
  • 确保每个工作节点主机都使用相同的时间配置运行,几分钟的差异可能会导致严重的执行错误,而这些错误可能不那么容易找到。考虑在主机操作系统上启用NTP服务

如果您通常有繁重的工作负载和高并发性,您可能需要调整Postgres设置,如max_connectionsshared_buffers。这同样适用于诸如ip_local_port_rangesomaxconn之类的主机OS网络设置。

在我在初始集群设置期间遇到的任何问题中,Flower和工作程序执行日志总是提供有用的详细信息和错误消息,包括任务级别日志和Docker Compose服务日志,即:docker-compose logs --tail=10000 airflow-worker > worker_logs.log

希望对你有用!

以下注意事项建立在已接受的答案之上,因为我认为它们可能与任何新的Airflow Celery设置有关:

  • 在分布式设置中,启用远程日志记录通常很方便,可以作为集中日志的一种方式。Airflow本机支持远程日志记录,请参阅例如this或this
  • 定义worker_autoscale而不是concurrency将允许在工作负载增加/减少时动态启动/停止新流程
  • 将工人环境中的环境变量DUMB_INIT_SETSID设置为0可以实现热关机(请参阅文档)
  • 在指向Airflow的base_log_folder的Docker Compose中将卷添加到worker中,可以安全地在本地持久保存worker日志。示例:
# docker-compose.yml
services:
airflow-worker:
...
volumes:
- worker_logs:/airflow/logs
...
...
volumes:
worker_logs:

最新更新