我正在使用芹菜将任务发送到远程服务器并尝试返回结果。使用远程服务器上update_state方法不断更新任务的状态。
我正在使用发送任务
app.send_task('task_name')
获取芹菜任务的结果是一个阻塞调用,我不希望我的 Django 应用程序等待结果和超时。
所以我尝试运行另一个芹菜任务来获得结果。
@app.task(ignore_result=True)
def catpure_res(task_id):
task_obj = AsyncResult(task_id)
task_obj.get(on_message=on_msg)
但它会导致以下错误。
Traceback (most recent call last):
File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 367, in trace_task
R = retval = fun(*args, **kwargs)
File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 622, in __protected_call__
return self.run(*args, **kwargs)
File "/home/arpit/project/appname/tasks/results.py", line 42, in catpure_res
task_obj.get(on_message=on_msg)
File "/usr/local/lib/python2.7/dist-packages/celery/result.py", line 168, in get
assert_will_not_block()
File "/usr/local/lib/python2.7/dist-packages/celery/result.py", line 44, in assert_will_not_block
raise RuntimeError(E_WOULDBLOCK)
RuntimeError: Never call result.get() within a task!
See http://docs.celeryq.org/en/latest/userguide/tasks.html#task-synchronous-subtasks
此错误是否有任何解决方法。我是否必须运行守护进程才能获得结果?
使用allow_join_result。请参阅下面的代码片段。
@app.task(ignore_result=True)
def catpure_res(task_id):
task_obj = AsyncResult(task_id)
with allow_join_result():
task_obj.get(on_message=on_msg)
注意:如其他答案中所述,它可能会导致性能问题甚至死锁,但是如果您的任务编写良好并且不会导致意外错误,那么它应该像魅力一样工作。
正如你的标题所解释的那样,在任务中调用get
是一种不好的做法,可能会导致死锁。 相反,您可以检查任务状态,并在准备就绪时get
结果:
result = catpure_res.AsyncResult(task_id, app=app)
if result.ready():
return result.get()
return result.state
您可以将上述代码片段包装在一个函数中,并每 x 秒请求一次。
编辑:考虑您的评论:
您可以改为获取
result.state
,并将retry
机制与countdown
一起使用,直到任务result.state == SUCCESS
。您可以添加芹菜
beat
以运行定期任务,以检查主要任务是否结束。请注意,使用如此繁重的任务(持续时间长(也是一种不好的做法。 考虑将其分解为一个小任务,并使用 canvas 将它们组合在一起。
from celery.result import allow_join_result
task_obj = send_task("do_something", [arg1, arg2, arg3])
with allow_join_result():
def on_msg(*args, **kwargs):
print(f"on_msg: {args}, {kwargs}")
try:
result = task_obj.get(on_msg=on_msg, timeout=timeout_s)
except TimeoutError as exc:
print("Timeout!")