向气流添加额外的芹菜配置



有人知道我可以在哪里向气流芹菜执行器添加额外的芹菜配置吗?例如,我想 http://docs.celeryproject.org/en/latest/userguide/configuration.html#worker-pool-restarts 此属性,但如何允许额外的芹菜属性。

使用刚刚发布的 Airflow 1.9.0,现在可以配置了。

在气流中.cfg有这样一行:

# Import path for celery configuration options
celery_config_options = airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG

它指向导入路径中的 Python 文件。当前默认版本可以 https://github.com/apache/incubator-airflow/blob/1.9.0/airflow/config_templates/default_celery.py

如果您需要无法通过该文件调整的设置,请创建一个新模块,说"my_celery_config.py":

CELERY_CONFIG = {
# ....
}

并将其放入您的AIRFLOW_HOME目录中(即与 DAGS/文件夹一起(,然后在配置中设置celery_config_options = my_celery_config.CELERY_CONFIG

如果您在 Docker 中运行 Airflow 并且想要更改 Celery 配置,则需要执行以下操作:

  1. dags文件夹所在的同一级别创建一个 Airflowconfig文件夹(如果您还没有(,并添加一个自定义 celery 配置文件(例如custom_celery_config.py(那里。

  2. 更改custom_celery_config.py中的默认芹菜配置。这个想法是这个python脚本应该包含一个变量,其中包含默认的Celery配置以及您对它的更改。例如,如果您想更改 Celery 的task_queues配置,您的custom_celery_config.py应如下所示:

    from airflow.config_templates.default_celery import DEFAULT_CELERY_CONFIG
    from kombu import Exchange, Queue
    CELERY_TASK_QUEUES = [
    Queue('task1', Exchange('task1', type='direct'), routing_key='task1', queue_arguments={'x-max-priority': 8}),
    Queue('task2', Exchange('task2', type='direct'), routing_key='task2', queue_arguments={'x-max-priority': 6}),
    ]
    CELERY_CONFIG = {
    **DEFAULT_CELERY_CONFIG,
    "task_queues": CELERY_TASK_QUEUES
    }
    
  3. config文件夹挂载到docker-compose.yml

    volumes:
    - /data/airflow/config:/opt/airflow/config
    
  4. docker-compose.yml中像这样设置 Celery 配置(因为 Docker 可以看到config文件夹,所以它可以访问你的custom_celery_config.py(:

    AIRFLOW__CELERY__CELERY_CONFIG_OPTIONS: 'custom_celery_config.CELERY_CONFIG'
    
  5. 重新启动气流网络服务器、调度程序等。

参考:这里。

有关 Celery 配置的详细信息,请查看此文档。

相关内容

  • 没有找到相关文章

最新更新