芹菜 - 异步任务链



我正在尝试开发一个概念验证,以便在使用带有 redis 作为代理的芹菜上进行异步任务链。

该程序是带有/run 和三个函数的 Flask API,这些函数需要作为异步任务运行,例如从 a(( 返回是 b(( 的参数,返回 b(( 是 c(( 的参数,它通过集合对象"集合"将数据写入 mongodb 集合。

@celery.task
def a(param):
    print("Original: {0}".format(param))
    print("Inside Task 1")
    param.update({"timestamp_A":str(datetime.timestamp), "result_A":True})
    print(param)
    return param
@celery.task
def b(param):
    print("Inside Task 2")
    param.update({"timestamp_B":str(datetime.timestamp), "result_B":True})
    print(param)
    return param
@celery.task
def c(param):
    print("Inside Task 3")
    collection.insert(dict(param))
    print("Output Saved to DB")

@app.route('/run', methods = ['GET'])
def run():
    if request.method != 'GET':
        return "HTTP Method not allowed"
    if request.method == 'GET':
        T = 1000
        for num in range(0, T):
            ds = {"test": num}
            chain(a.s(ds) | b.s() | c.s()).apply_async()
        return "Process Complete"
if __name__ == '__main__':
    app.run(debug=True)

使用上面的代码,任务链接有效,即 a(( 使用其参数执行,但要执行函数 b((,它首先等待整个数据在 a(( 中排队,然后才执行 b((。我需要一旦执行任何任务 a((,它就应该交给 b(( 等等。有人对我可能出错的地方有任何指示吗?

我可能错过了一些东西,但似乎最简单的方法是在上一个任务结束时调用下一个任务。

@celery.task
def a(arg):
  ret = calc(arg)
  b.apply_async(ret)
@celery.task
def b(arg):
  ret = calc(arg)
  c.apply_async(ret)
@celery.task
def c(arg):
  ret = calc(arg)
  mongo.store(ret)

这不允许你有时调用循环,有时不调用,但你可以将任务包装在同步运行内部部分的外部任务中。

从您提供的描述来看,听起来您应该使用链条。链完全按照您的要求执行,将任务集中在一起,将每个任务的返回值传递给链中的下一个任务。

相关内容

  • 没有找到相关文章

最新更新