使用具有并发性的Celery对特定任务进行串行处理



我有一个python/cerele设置:我有一条名为"task_queue;以及从不同传感器向其提供数据的多个python脚本。如果传感器值从高变为低,则有一个芹菜工作者从该队列中读取并向用户发送警报。工作线程有多个线程(我启用了自动缩放参数(,在一个传感器决定同时发送多条消息之前,一切都很好。这时我得到了竞争条件,可能会向用户发送多个警报,因为在线程存储已经发送警报的信息之前,很少有其他线程也会发送警报

我有n个传感器(n可以超过10000(,来自任何传感器的消息都应该按顺序处理。所以理论上我可以有n个线程,但那太过分了。我正在寻找一种最简单的方法,在x个线程(通常是10或20个(之间平均分配消息,这样我就不必每次增加x(或减少(时都(重新(编写路由函数并定义新的队列。

那么,是否有可能以某种方式将源自同一传感器的任务标记为以串行方式执行(当调用延迟或apply_async时(?或者我应该使用不同的队列/工作程序体系结构来实现这一点?

据我所知,您有一些任务可以同时运行,但有一个特定任务不能执行(此任务需要一次执行1个(。

(目前(没有办法设置特定任务队列的并发性,所以我认为在您的情况下,最好的方法是用多个工作者来处理这个问题。

假设您有以下队列:

  • queue_1在这里我们发送可以同时运行的任务
  • queue_2在这里,我们发送一次可以运行1的任务

您可以使用以下命令启动芹菜(如果您希望它们在同一台机器中(。

celery -A proj worker --loglevel=INFO --concurrency=10 -n worker1@%h -Q queue_1
celery -A proj worker --loglevel=INFO --concurrency=1 -n worker2@%h -Q queue_2

这将使具有并发10的worker1处理所有可以同时运行的任务,而worker2一次只处理需要为1的任务。

以下是一些文档参考:https://docs.celeryproject.org/en/stable/userguide/workers.html

注意:在这里,您需要指定运行队列的任务。这可以在使用apply_async调用时完成,可以直接从装饰器或其他方式进行。

最新更新