我希望某个任务一次只运行一个。其他类型的任务可以在并发工作线程中运行。
我不能使用芹菜的例子,因为如果一个任务已经在运行,我将丢失新任务。
我可以在一段时间后再次重试任务,但如果同时出现一个新任务(第三个(,如果第一个任务已经完成,那么它将被运行(第二个任务仍在等待(,这是我不想要的。我想保持订单。
我知道最简单的事情是有两个工人和两个队列。一个具有>1 并发的工作线程监视其他任务的队列,另一个只有 1 个并发的工作线程将仅监视one_at_a_time
类型任务的队列。但我只想使用一个具有>1 并发的工作线程。
我有一种将队列直接存储在redis
的方法:
@app.task()
def one_at_a_time_task(arg):
if redis.queue.length == 0:
redis.queue.add(arg)
call_my_task()
else:
redis.queue.add(arg)
def call_my_task():
while(redis.queue.length > 0):
taskArgs = redis.queue.first
# actual task logic ...
redis.queue.dequeue
我将不得不锁定队列,以免发生竞争条件,但这是可行的(我认为(。
有没有更好/更简单的方法可以做到这一点?也许我在哪里不需要维护队列,可以直接使用芹菜的队列?
我重新阅读了您的问题描述,它说"一次一个任务",所以我的想法 2 行不通。因此,应使用简单的 for 循环或"想法 1"(一个并发设置为 1 的芹菜工作线程(。
idea 0:不要使用芹菜。 只需执行一个 for 循环即可处理每个任务。
想法 1:一个并发设置为 1 的芹菜工作线程
- 严格来说,如果您想一次处理一个任务并保持任务的顺序。
- 您可以启动一个工作线程。 然后将并发设置为 1。
- 然后,这一个工作人员将一次处理一个任务并保持任务的顺序。
- 但这不会提供任何优势,相反,与仅使用for循环来处理所有任务而无需芹菜或芹菜工人相比,只会带来更多的复杂性。
想法 2:N 个并发设置为 M 的工作线程
- 实现更快处理的一种方法是做一些类似于 Python 的
multiprocess.pool.Pool
(或multiprocess.pool.ThreadPool
(的事情,它有一个函数可以做你想要的,称为map
或starmap
。 - 使用芹菜实现这一目标
- 您将使用 M 并发启动 N 个工作线程
- 然后你可以有一个for循环来提交所有任务
- 在列表中追加每个异步结果
- 然后有第二个 for 循环以相同的顺序等待每个 AsyncResult
- 这将使用Celery实现NxM并行处理,并且仍然确保您的结果顺序相同
- 另外,如果我提出的设计听起来太难实现,您可以使用内置的 Celery
map
,starmap
,chunks
已经为您实现了我建议的内容:请参阅 Celery 文档 https://docs.celeryq.dev/en/stable/userguide/canvas.html#map-starmap