如何使用Apache Airflow配置为Celery设置SQS预定义队列



我正在使用带有Amazon SQS的芹菜配置Apache Airflow。我知道Celery允许broker_transport_options(https://docs.celeryproject.org/en/stable/getting-started/brokers/sqs.html)Airflow在其配置中包含一个名为cele_broker_transport_options的部分。

我知道我可以在Airflow配置中传递简单的字符串。例如,在.celery_broker_transport部分,我可以传递:

region = us-west-1

这相当于对芹菜说:

broker_transport_options = {'region': 'us-west-1'}

我正试图在Airflow中传递预定义队列选项,它在Celery中看起来如下:

broker_transport_options = {
'predefined_queues': {
'my-q': {
'url': 'https://ap-southeast-2.queue.amazonaws.com/123456/my-q',
'access_key_id': 'xxx',
'secret_access_key': 'xxx',
}
}
}

我不确定如何将这些信息传递给Airflow。我尝试了以下操作,但我收到一个错误,说"str"对象没有属性"items":

predefined_queues = 'my-q': { 'url': 'https://sqs.us-east-1.amazonaws.com/1234567890/my-q', }

您不能通过气流配置文件直接设置此值。相反,您需要使用celery_config_options配置值来指向在python代码中设置predefined_queues的模块。

像这样的东西可能就是你想要的:

from airflow.config_templates.default_celery import DEFAULT_CELERY_CONFIG
CELERY_CONFIG = {
**DEFAULT_CELERY_CONFIG,
"broker_transport_options": {
**DEFAULT_CELERY_CONFIG["broker_transport_options"],
"predefined_queues": {
"my-q": { "url": "https://sqs.us-east-1.amazonaws.com/1234567890/my-q" },
},
},
}

如果你把它放在一个名为celery_config.py的文件中,那么你应该能够在配置文件中设置celery_config_options = celery_config.CELERY_CONFIG,并正确地配置芹菜。

相关内容

  • 没有找到相关文章

最新更新