如何在芹菜工作者中一次只执行一个任务,而不会丢失它并保持队列中任务的顺序



我希望某个任务一次只运行一个。其他类型的任务可以在并发工作线程中运行。

我不能使用芹菜的例子,因为如果一个任务已经在运行,我将丢失新任务。

我可以在一段时间后再次重试任务,但如果同时出现一个新任务(第三个(,如果第一个任务已经完成,那么它将被运行(第二个任务仍在等待(,这是我不想要的。我想保持订单。

我知道最简单的事情是有两个工人和两个队列。一个具有>1 并发的工作线程监视其他任务的队列,另一个只有 1 个并发的工作线程将仅监视one_at_a_time类型任务的队列。但我只想使用一个具有>1 并发的工作线程。

我有一种将队列直接存储在redis的方法:

@app.task()
def one_at_a_time_task(arg):
if redis.queue.length == 0:
redis.queue.add(arg)
call_my_task()
else:
redis.queue.add(arg)
def call_my_task():
while(redis.queue.length > 0):
taskArgs = redis.queue.first
# actual task logic ...
redis.queue.dequeue

我将不得不锁定队列,以免发生竞争条件,但这是可行的(我认为(。

有没有更好/更简单的方法可以做到这一点?也许我在哪里不需要维护队列,可以直接使用芹菜的队列?

我重新阅读了您的问题描述,它说"一次一个任务",所以我的想法 2 行不通。因此,应使用简单的 for 循环或"想法 1"(一个并发设置为 1 的芹菜工作线程(。


idea 0:不要使用芹菜。 只需执行一个 for 循环即可处理每个任务。

想法 1:一个并发设置为 1 的芹菜工作线程

  • 严格来说,如果您想一次处理一个任务并保持任务的顺序。
  • 您可以启动一个工作线程。 然后将并发设置为 1。
  • 然后,这一个工作人员将一次处理一个任务并保持任务的顺序。
  • 但这不会提供任何优势,相反,与仅使用for循环来处理所有任务而无需芹菜或芹菜工人相比,只会带来更多的复杂性。

想法 2:N 个并发设置为 M 的工作线程

  • 实现更快处理的一种方法是做一些类似于 Python 的multiprocess.pool.Pool(或multiprocess.pool.ThreadPool(的事情,它有一个函数可以做你想要的,称为mapstarmap
  • 使用芹菜实现这一目标
    • 您将使用 M 并发启动 N 个工作线程
    • 然后你可以有一个for循环来提交所有任务
    • 在列表中追加每个异步结果
    • 然后有第二个 for 循环以相同的顺序等待每个 AsyncResult
  • 这将使用Celery实现NxM并行处理,并且仍然确保您的结果顺序相同
  • 另外,如果我提出的设计听起来太难实现,您可以使用内置的 Celerymapstarmapchunks已经为您实现了我建议的内容:请参阅 Celery 文档 https://docs.celeryq.dev/en/stable/userguide/canvas.html#map-starmap

相关内容

  • 没有找到相关文章