我正在尝试调用一个创建和弦的方法,并在我在另一个方法中构建的另一个和弦中使用该和弦。但是我无法以正确的顺序调用回调。
#celery obj
app = Celery('tasks', backend=BACKEND, broker=BROKER)
以下是我用来测试这一点的简单方法:
@app.task
def first_task(args):
return 'first_task'
@app.task
def first_body(args):
return 'first_body'
@app.task
def second_body(args):
return 'second_body'
@app.task
def third_body(args):
return 'third_body'
以下是我创建和弦的方式:
@app.task
def simple_chord(args):
c = chord(group([first_task.s(args)]), body=first_body.s())
return c()
# this works as expected
# first_task -> first_body -> second_body -> third_body
def test_chords():
c = chord(group([chord(group([chord(group([first_task.s('foo')]),
body=first_body.s())]),
body=second_body.s())]),
body=third_body.s())
c.delay()
# this does not work as expected
# first_task -> second_body -> first_body -> third_body
def test_chords_2():
c = chord(group([chord(group([simple_chord.s('foo')]),
body=second_body.s())]),
body=third_body.s())
c.delay()
当我像test_chords
一样在一个地方构建和弦时,它会做我希望它做的事情(first_task -> first_body -> second_body -> third_body(。但是,我希望能够获得我在simple_chord
方法中构建的和弦,并在另一个和弦中使用它,就像在test_chords_2
中一样,但它不会等待first_body
执行后再执行second_body
。它改为按以下顺序执行:first_task -> second_body -> first_body -> third_body
我在工作线程控制台中看到的内容:
[2018-06-25 16:55:21,466: INFO/MainProcess] Received task: tasks.simple_chord[c78eb259-e1e1-4b4d-b053-247afce4d536]
[2018-06-25 16:55:21,660: INFO/MainProcess] Received task: tasks.first_task[fca138d2-c672-4299-a2ed-1bb5cacb66da]
[2018-06-25 16:55:21,685: INFO/ForkPoolWorker-1] Task tasks.simple_chord[c78eb259-e1e1-4b4d-b053-247afce4d536] succeeded in 0.19740361299773213s: <AsyncResult: c4db6bbf-d150-44db-a34e-99ea6bd71012>
[2018-06-25 16:55:21,688: INFO/MainProcess] Received task: tasks.second_body[afc32047-3239-4118-bb2c-c116f9a241bf]
[2018-06-25 16:55:23,340: INFO/ForkPoolWorker-1] Task tasks.first_task[fca138d2-c672-4299-a2ed-1bb5cacb66da] succeeded in 1.639255696994951s: 'first_task'
[2018-06-25 16:55:23,344: INFO/MainProcess] Received task: tasks.first_body[c4db6bbf-d150-44db-a34e-99ea6bd71012]
[2018-06-25 16:55:24,099: INFO/ForkPoolWorker-1] Task tasks.second_body[afc32047-3239-4118-bb2c-c116f9a241bf] succeeded in 0.7374479610007256s: 'second_body'
[2018-06-25 16:55:24,103: INFO/MainProcess] Received task: tasks.third_body[9a90c2da-cd03-4f55-9675-9194acd7d42f]
[2018-06-25 16:55:24,656: INFO/ForkPoolWorker-1] Task tasks.first_body[c4db6bbf-d150-44db-a34e-99ea6bd71012] succeeded in 0.5298690330091631s: 'first_body'
[2018-06-25 16:55:25,247: INFO/ForkPoolWorker-1] Task tasks.third_body[9a90c2da-cd03-4f55-9675-9194acd7d42f] succeeded in 0.5770523219980532s: 'third_body'
我的问题是做这样的事情的最佳方法是什么?
通过在test_chords_2
中正常调用simple_chord
(而不是作为签名(解决了该问题。我让它不必要地异步运行:
def test_chords_2():
c = chord(group([chord(group([simple_chord('foo')]),
body=second_body.s())]),
body=third_body.s())
print(c)
c.delay()