芹菜的等效multiprocessing.JoinableQueue
(或gevent.queue.JoinableQueue
)是什么?
我正在寻找的功能是能够从发布者.join()
一个芹菜任务队列,等待队列中的所有任务完成。
等待初始的AsyncResult
或GroupResult
是不够的,因为队列是由工人自己动态填充的。
这可能不是完美的,但这是我最终想到的。
它基本上是一个JoinableQueue
包装在现有的芹菜队列之上,基于共享Redis计数器和列表侦听器。它要求队列名称与其路由键相同(由于before_task_publish
和task_postrun
信号的内部实现细节)。
joinableceleryqueue.py :
from celery.signals import before_task_publish, task_postrun
from redis import Redis
import settings
memdb = Redis.from_url(settings.REDIS_URL)
class JoinableCeleryQueue(object):
def __init__(self, queue):
self.queue = queue
self.register_queue_hooks()
def begin(self):
memdb.set(self.count_prop, 0)
@property
def count_prop(self):
return "jqueue:%s:count" % self.queue
@property
def finished_prop(self):
return "jqueue:%s:finished" % self.queue
def task_add(self, routing_key, **kw):
if routing_key != self.queue:
return
memdb.incr(self.count_prop)
def task_done(self, task, **kw):
if task.queue != self.queue:
return
memdb.decr(self.count_prop)
if memdb.get(self.count_prop) == "0":
memdb.rpush(self.finished_prop, 1)
def register_queue_hooks(self):
before_task_publish.connect(self.task_add)
task_postrun.connect(self.task_done)
def join(self):
memdb.brpop(self.finished_prop)
我选择使用BRPOP
而不是pub/sub,因为我只需要一个侦听器侦听"所有任务完成"事件(发布者)。
使用JoinableCeleryQueue
非常简单- begin()
在将任何任务添加到队列之前,使用常规芹菜API添加任务,.join()
等待所有任务完成。