Python芹菜:如何不顺序加入任务结果



我有一个简单的项目,我创建了一大堆彼此无关的工作,创建任务,将其传递给redis,并使许多工人分布在一个Docker蜂拥而至,经过长期运行的任务队列。工人完成后,他们将完成的工作倾倒在NFS共享中,并将文本价值发送给芹菜客户端。

我正在使用asyncresult对象的结果集数组上的celery.result.resultset的.join()函数。join()包括一个回调,该回调(目前)只需打印结果。

我的问题是join()块,直到它以给定的顺序接收每个异步值。我的群是由许多机器截然不同的主机组成的,对我来说,在我完成的情况下,没有订单或一旦完成,对我来说很重要。

是否有通过芹菜在任务完成后正确触发回调功能的方法?我在网上查看了很多示例,似乎我唯一的选择是在Asyncio上尝试运气,但是Python并不是我的强大套房。

用于创建任务和Resultset OBJ的功能:

def populateQueue(encodeTasks):
r = ResultSet([])
taskHandles = {}
for task in encodeTasks:
    try:
        ret = encode.delay(task)
        r.add(ret)
        logging.debug("Task ID: " + str(ret.task_id))
        taskHandles[ret.task_id] = ret 
    except:
        logging.info("populateQueue fail: " + str(task.traceback))
logging.info("Tasks queued: " + str(len(taskHandles)))
return taskHandles, r

Main()的一部分等待结果:

        frameCountTotal = getFrameCount(targetFile)
        encodeTasks = buildCmdString(targetFile, frameCountTotal, clientCount)
        taskHandles, retSet = populateQueue(encodeTasks)
        logging.info("Waiting on tasks...")
        retSet.join(callback=testCallback)

预先感谢

找到了我自己问题的答案:

ResultSet具有另一种称为Join_native()的方法,我认为只要该经纪人是几种已知产品(RabbitMQ,Redis等),我认为它会使用更具体的API调用。芹菜的文档只是说,如果您满足经纪人的要求,它会提供更好的性能。文档没有说的是它允许订购返回(至少在Redis上,没有尝试过RMQ)。

相关内容

  • 没有找到相关文章

最新更新