我需要用SQS
代理替换我的redis
代理,在谷歌搜索时,我遇到了许多页面,这些页面告诉如何将SQS
与celery
一起使用。根据我的理解,它会创建自己的 SQS 队列,我只有一个任务,想使用已经创建的 SQS 队列。
默认情况下,芹菜将使用队列前缀设置(如果已定义)为您创建一个新队列。
但是,如果要使用现有队列,则可以提供具有task-default-queue
设置的名称。在这种情况下,请确保不要定义上面提到的队列前缀。
您可以通过broker_transport_options
(在芹菜 4.0 中)设置队列名称,如下所示:
broker_transport_options = {"queue_name_prefix": "my-queue-"}
文档在这里
2020 年 2 月 26 日的提交增加了使用预定义队列的功能。
您应该能够通过向CELERY_BROKER_TRANSPORT_OPTIONS添加预定义队列选项来使用预定义队列
CELERY_BROKER_TRANSPORT_OPTIONS={
'predefined_queues':{
'HIGH_PRIORITY': {
'url': 'https://sqs.ap-south-1.amazonaws.com/030221/HGH_PRIORITY',
'access_key_id': config('AWS_ACCESS_KEY'),
'secret_access_key': config('AWS_SECRET_KEY'),
},
}
}
以下是提交的文档更新 -
Other Features supported by this transport:
Predefined Queues:
The default behavior of this transport is to use a single AWS credential
pair in order to manage all SQS queues (e.g. listing queues, creating
queues, polling queues, deleting messages).
If it is preferable for your environment to use a single AWS credential, you
can use the 'predefined_queues' setting inside the 'transport_options' map.
This setting allows you to specify the SQS queue URL and AWS credentials for
each of your queues. For example, if you have two queues which both already
exist in AWS) you can tell this transport about them as follows:
transport_options = {
'predefined_queues': {
'queue-1': {
'url': 'https://sqs.us-east-1.amazonaws.com/xxx/aaa',
'access_key_id': 'a',
'secret_access_key': 'b',
},
'queue-2': {
'url': 'https://sqs.us-east-1.amazonaws.com/xxx/bbb',
'access_key_id': 'c',
'secret_access_key': 'd',
},
}
}
如果您已经在 SQS 中创建了一个队列,并且其名称为"my_super_queue",那么要在芹菜中使用它,您应该按如下方式定义配置:
broker_url = f"sqs://{aws_access_key}:{aws_secret_key}@"
result_backend = 'file://results' # Or whatever option but 'rpc'
task_default_queue = "my_super_queue"
broker_transport_options = {
'visibility_timeout': 100, # YOU DECIDE THIS NUMBER
'region': 'us-west-2', # DON'T FORGET THIS
}
请记住输入您登录时使用的用户的凭据。 还要记住向该用户授予正确的权限(在我的情况下,我给了它 AmazonSQSFullAccess)
通过提供凭据(访问和密钥),您无需在broker_url中指定任何 URL。这是因为在使用给定凭据连接时,您可以访问 SQS 队列的列表。它将尝试使用task_default_queue中指定的现有队列,如果找不到它,它将创建它。
我没有在这里指定queue_name_prefix的值(在broker_transport_options内部),但是如果您这样做,要使用的(或创建)队列的最终名称将是两个queue_name_prefix的串联,后跟task_default_queue。
考虑如果创建的队列是SQS FIFO队列,则必须以".fifo"结尾,因此在这种情况下它将是my_super_queue.fifo
我已经使用以下代码queue_name_prefix... 它在生产环境中运行的Flask和Django应用程序中。
from celery import Celery
def make_celery(app):
celery = Celery(
app.import_name,
broker="sqs://",
broker_transport_options={
"queue_name_prefix": "{SERVICE_ENV}-{SERVICE_NAME}-"
},
)
task_base = celery.Task
class ContextTask(task_base):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return task_base.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery