我有一个简单的项目,我创建了一大堆彼此无关的工作,创建任务,将其传递给redis,并使许多工人分布在一个Docker蜂拥而至,经过长期运行的任务队列。工人完成后,他们将完成的工作倾倒在NFS共享中,并将文本价值发送给芹菜客户端。
我正在使用asyncresult对象的结果集数组上的celery.result.resultset的.join()函数。join()包括一个回调,该回调(目前)只需打印结果。
我的问题是join()块,直到它以给定的顺序接收每个异步值。我的群是由许多机器截然不同的主机组成的,对我来说,在我完成的情况下,没有订单或一旦完成,对我来说很重要。
是否有通过芹菜在任务完成后正确触发回调功能的方法?我在网上查看了很多示例,似乎我唯一的选择是在Asyncio上尝试运气,但是Python并不是我的强大套房。
用于创建任务和Resultset OBJ的功能:
def populateQueue(encodeTasks):
r = ResultSet([])
taskHandles = {}
for task in encodeTasks:
try:
ret = encode.delay(task)
r.add(ret)
logging.debug("Task ID: " + str(ret.task_id))
taskHandles[ret.task_id] = ret
except:
logging.info("populateQueue fail: " + str(task.traceback))
logging.info("Tasks queued: " + str(len(taskHandles)))
return taskHandles, r
Main()的一部分等待结果:
frameCountTotal = getFrameCount(targetFile)
encodeTasks = buildCmdString(targetFile, frameCountTotal, clientCount)
taskHandles, retSet = populateQueue(encodeTasks)
logging.info("Waiting on tasks...")
retSet.join(callback=testCallback)
预先感谢
找到了我自己问题的答案:
ResultSet具有另一种称为Join_native()的方法,我认为只要该经纪人是几种已知产品(RabbitMQ,Redis等),我认为它会使用更具体的API调用。芹菜的文档只是说,如果您满足经纪人的要求,它会提供更好的性能。文档没有说的是它允许订购返回(至少在Redis上,没有尝试过RMQ)。