我在 Celery 中有一个记录器(使用 RabbitMQ),并希望在紧急情况下复制它的工作。
# tasks.py
@task
def log(message):
with open('test.txt', 'a') as f:
f.write(message)
# views.py
log.delay(message)
如何使不同机器上的 2 个 Celery 实例在调用时log()
运行?
这样做有意义吗?
这在 RabbitMQ 中是可能的。如果您有基于主题的交换,很明显,一条消息可以放入两个不同的队列中,并独立地传递给 2 个接收方。
sender =>
[message, routing_key=event.logging.log] => [queue A, topic=event.#]
=> receiver 1
=> [queue B, topic=*.logging.*]
=> receiver 2
消息将被发送到两个队列,并且它们都不会从另一个队列窃取消息。
您必须将交换配置为主题交换(如您所说):
CELERY_QUEUES = {
'celery': {
'exchange': 'celerytopic',
'exchange_type': 'topic',
'routing_key': 'celery',
},
}
然后,您可以使用 AMQP API 创建备份交换:
from celery import current_app as celery
with celery.broker_connection() as conn:
conn.default_channel.queue_declare(queue='celery.backup', durable=True)
conn.default_channel.queue_bind(queue='celery.backup',
exchange='celerytopic',
routing_key='celery',
durable=True)
由于您已经有一个名为芹菜的队列,因此您可能必须先删除它:
$ camqadm queue.delete celery
尝试在两台不同的机器上启动此任务对我来说是没有意义的。至少 Celery 不能保证一个任务会在不同的机器上运行——是 RabbitMQ 分配负载,如果一个节点的负载比另一个节点少——运行的两个任务可能会在那台机器上执行......
使用任务。retry
相反。如果任务执行失败,Celery 将重试任务。芹菜足够聪明,可以理解任务是否失败。只需确保在任务失败时引发一些异常,如果无法成功记录,则不要静默返回。
更新:
可能的工作流可能是 - 尝试执行任务,如果任务失败,on_retry更改routing_key,并尝试在不同的交换/队列中执行任务,这可能是您的故障转移队列。