我正在使用带有Amazon SQS的芹菜配置Apache Airflow。我知道Celery允许broker_transport_options(https://docs.celeryproject.org/en/stable/getting-started/brokers/sqs.html)Airflow在其配置中包含一个名为cele_broker_transport_options的部分。
我知道我可以在Airflow配置中传递简单的字符串。例如,在.celery_broker_transport部分,我可以传递:
region = us-west-1
这相当于对芹菜说:
broker_transport_options = {'region': 'us-west-1'}
我正试图在Airflow中传递预定义队列选项,它在Celery中看起来如下:
broker_transport_options = {
'predefined_queues': {
'my-q': {
'url': 'https://ap-southeast-2.queue.amazonaws.com/123456/my-q',
'access_key_id': 'xxx',
'secret_access_key': 'xxx',
}
}
}
我不确定如何将这些信息传递给Airflow。我尝试了以下操作,但我收到一个错误,说"str"对象没有属性"items":
predefined_queues = 'my-q': { 'url': 'https://sqs.us-east-1.amazonaws.com/1234567890/my-q', }
您不能通过气流配置文件直接设置此值。相反,您需要使用celery_config_options
配置值来指向在python代码中设置predefined_queues
的模块。
像这样的东西可能就是你想要的:
from airflow.config_templates.default_celery import DEFAULT_CELERY_CONFIG
CELERY_CONFIG = {
**DEFAULT_CELERY_CONFIG,
"broker_transport_options": {
**DEFAULT_CELERY_CONFIG["broker_transport_options"],
"predefined_queues": {
"my-q": { "url": "https://sqs.us-east-1.amazonaws.com/1234567890/my-q" },
},
},
}
如果你把它放在一个名为celery_config.py
的文件中,那么你应该能够在配置文件中设置celery_config_options = celery_config.CELERY_CONFIG
,并正确地配置芹菜。