如何复制 Django Celery worker



我在 Celery 中有一个记录器(使用 RabbitMQ),并希望在紧急情况下复制它的工作。

# tasks.py
@task
def log(message):
    with open('test.txt', 'a') as f:
        f.write(message)

# views.py
log.delay(message)

如何使不同机器上的 2 个 Celery 实例在调用时log()运行?

这样做有意义吗?

这在 RabbitMQ 中是可能的。如果您有基于主题的交换,很明显,一条消息可以放入两个不同的队列中,并独立地传递给 2 个接收方。

sender =>
[message, routing_key=event.logging.log] => [queue A, topic=event.#]     
                                                      => receiver 1
                                         => [queue B, topic=*.logging.*]
                                                      => receiver 2
消息

将被发送到两个队列,并且它们都不会从另一个队列窃取消息。

为此,

您必须将交换配置为主题交换(如您所说):

CELERY_QUEUES = {
   'celery': {
       'exchange': 'celerytopic',
       'exchange_type': 'topic',
       'routing_key': 'celery',
   },
}

然后,您可以使用 AMQP API 创建备份交换:

 from celery import current_app as celery
 with celery.broker_connection() as conn:
     conn.default_channel.queue_declare(queue='celery.backup', durable=True)
     conn.default_channel.queue_bind(queue='celery.backup',
                                     exchange='celerytopic',
                                     routing_key='celery',
                                     durable=True)

由于您已经有一个名为芹菜的队列,因此您可能必须先删除它:

$ camqadm queue.delete celery

尝试在两台不同的机器上启动此任务对我来说是没有意义的。至少 Celery 不能保证一个任务会在不同的机器上运行——是 RabbitMQ 分配负载,如果一个节点的负载比另一个节点少——运行的两个任务可能会在那台机器上执行......

使用任务。retry相反。如果任务执行失败,Celery 将重试任务。芹菜足够聪明,可以理解任务是否失败。只需确保在任务失败时引发一些异常,如果无法成功记录,则不要静默返回。

更新:

可能的工作流可能是 - 尝试执行任务,如果任务失败,on_retry更改routing_key,并尝试在不同的交换/队列中执行任务,这可能是您的故障转移队列。

相关内容

  • 没有找到相关文章

最新更新