有人知道我可以在哪里向气流芹菜执行器添加额外的芹菜配置吗?例如,我想 http://docs.celeryproject.org/en/latest/userguide/configuration.html#worker-pool-restarts 此属性,但如何允许额外的芹菜属性。
使用刚刚发布的 Airflow 1.9.0,现在可以配置了。
在气流中.cfg有这样一行:
# Import path for celery configuration options
celery_config_options = airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG
它指向导入路径中的 Python 文件。当前默认版本可以 https://github.com/apache/incubator-airflow/blob/1.9.0/airflow/config_templates/default_celery.py
如果您需要无法通过该文件调整的设置,请创建一个新模块,说"my_celery_config.py":
CELERY_CONFIG = {
# ....
}
并将其放入您的AIRFLOW_HOME目录中(即与 DAGS/文件夹一起(,然后在配置中设置celery_config_options = my_celery_config.CELERY_CONFIG
。
如果您在 Docker 中运行 Airflow 并且想要更改 Celery 配置,则需要执行以下操作:
-
在
dags
文件夹所在的同一级别创建一个 Airflowconfig
文件夹(如果您还没有(,并添加一个自定义 celery 配置文件(例如custom_celery_config.py
(那里。 -
更改
custom_celery_config.py
中的默认芹菜配置。这个想法是这个python脚本应该包含一个变量,其中包含默认的Celery配置以及您对它的更改。例如,如果您想更改 Celery 的task_queues
配置,您的custom_celery_config.py
应如下所示:from airflow.config_templates.default_celery import DEFAULT_CELERY_CONFIG from kombu import Exchange, Queue CELERY_TASK_QUEUES = [ Queue('task1', Exchange('task1', type='direct'), routing_key='task1', queue_arguments={'x-max-priority': 8}), Queue('task2', Exchange('task2', type='direct'), routing_key='task2', queue_arguments={'x-max-priority': 6}), ] CELERY_CONFIG = { **DEFAULT_CELERY_CONFIG, "task_queues": CELERY_TASK_QUEUES }
-
将
config
文件夹挂载到docker-compose.yml
:volumes: - /data/airflow/config:/opt/airflow/config
在 docker-compose.yml
中像这样设置 Celery 配置(因为 Docker 可以看到config
文件夹,所以它可以访问你的custom_celery_config.py
(:AIRFLOW__CELERY__CELERY_CONFIG_OPTIONS: 'custom_celery_config.CELERY_CONFIG'
重新启动气流网络服务器、调度程序等。
参考:这里。
有关 Celery 配置的详细信息,请查看此文档。