如何在芹菜任务中锁定变量?



我有一个芹菜任务,它破坏了一些变量。如果我设置一个芹菜工作线程,它就会完美运行,但是当我使用并发时,一切都搞砸了。如何锁定变量被破坏的关键部分?

inb4:使用 Python 3.6,Redis 既作为代理又作为结果支持。 线程。锁在这里无济于事。

只要芹菜在多个工作器(进程(上运行,线程锁定就无济于事,因为它在单个进程中工作。此外,当您控制整个过程时,可以使用线程锁,而使用芹菜则无法实现这一目标。

这意味着芹菜需要分布式锁。对于 django,我总是使用 django-cache,如:这里。如果您需要更多通用锁,尤其是基于 Redis 的锁,适用于任何 python 应用程序,您可以使用 sherlock。

我知道这是一个 2+ 年的问题,但现在我正在调整我的芹菜配置,我来到了这个话题。

我在Linux机器中使用python 2.7和Django 1.11和celery 4。我正在使用 rabbitmq 作为经纪人。

我的配置意味着让芹菜作为守护程序和芹菜节拍运行以处理计划任务。

因此,对于给定任务的专用队列,您可以使用并发性=1(子进程(的工作器(进程(配置此队列。

此解决方案解决了 celery 运行任务的并发问题,但在代码中,如果在不使用 celery 的情况下运行任务,则不会遵循并发原则。

示例代码:

CELERY_TASK_QUEUES = (
Queue('celery_periodic', default_exchange, routing_key='celery_periodic'),
Queue('celery_task_1', celery_task_1_exchange, routing_key='celery_task_1'),
)
default_exchange            = Exchange('celery_periodic', type='direct')
celery_task_1_exchange      = Exchange('celery_task_1', type='direct')
CELERY_BEAT_SCHEDULE = {
'celery-task-1': {
'task':     'tasks.celery_task_1',
'schedule': timedelta(minutes=15),
'queue':    'celery_task_1'
},
}

最后,在/etc/default/celeryd 中(文档在这里:https://docs.celeryproject.org/en/latest/userguide/daemonizing.html#example-configuration(:

CELERYD_NODES="worker1 worker2"
CELERYD_OPTS="--concurrency=1 --time-limit=600 -Q:worker1 celery_periodic -Q:worker2 celery_task_1"
-

-并发 N 表示您的工作线程实例将有 N 个工作器子进程(意味着工作线程实例可以处理 N 个并行任务((从这里开始:https://stackoverflow.com/a/44903753/9412892(。

在这里阅读更多: https://docs.celeryproject.org/en/stable/userguide/workers.html#concurrency

溴, 爱德华多

最新更新