>问题
我已将长时间运行的任务划分为逻辑子任务,因此我可以在完成每个子任务时报告其结果。但是,我正在尝试报告实际上永远不会完成的任务的结果(而是在进行时产生值(,并且正在努力使用我现有的解决方案来做到这一点。
背景
我正在为我编写的一些Python程序构建一个Web界面。用户可以通过 Web 表单提交作业,然后返回查看作业进度。
假设我有两个函数,每个函数通过单独的表单访问:
-
med_func
:执行需要~1分钟,结果传递给render()
,从而产生额外的数据。 -
long_func
:返回生成器。每个yield
大约需要 30 分钟,应向用户报告。有这么多的收益,我们可以认为这个迭代器是无限的(仅在撤销时终止(。
代码,当前实现
对于med_func
,我报告的结果如下:
在表单提交时,我将AsyncResult
保存到 Django 会话:
task_result = med_func.apply_async([form], link=render.s())
request.session["task_result"] = task_result
结果页面的 Django 视图访问此AsyncResult
。当任务完成后,结果被保存到一个对象中,该对象作为上下文传递给 Django 模板。
def results(request):
""" Serve (possibly incomplete) results of a session's latest run. """
session = request.session
try: # Load most recent task
task_result = session["task_result"]
except KeyError: # Already cleared, or doesn't exist
if "results" not in session:
session["status"] = "No job submitted"
else: # Extract data from Asynchronous Tasks
session["status"] = task_result.status
if task_result.ready():
session["results"] = task_result.get()
render_task = task_result.children[0]
# Decorate with rendering results
session["render_status"] = render_task.status
if render_task.ready():
session["results"].render_output = render_task.get()
del(request.session["task_result"]) # Don't need any more
return render_to_response('results.html', request.session)
此解决方案仅在函数实际终止时有效。我不能将long_func
的逻辑子任务链接在一起,因为有未知数量的yield
(long_func
循环的每次迭代可能不会产生结果(。
问题
是否有任何明智的方法可以从运行时间极长的 Celery 任务中访问生成的对象,以便在生成器耗尽之前显示它们?
为了让 Celery 知道任务的当前状态是什么,它会在你拥有的任何结果后端设置一些元数据。 您可以捎带它来存储其他类型的元数据。
def yielder():
for i in range(2**100):
yield i
@task
def report_progress():
for progress in yielder():
# set current progress on the task
report_progress.backend.mark_as_started(
report_progress.request.id,
progress=progress)
def view_function(request):
task_id = request.session['task_id']
task = AsyncResult(task_id)
progress = task.info['progress']
# do something with your current progress
我不会在那里扔大量数据,但它非常适合跟踪长时间运行的任务的进度。
保罗的回答很棒。作为使用mark_as_started
的替代方法,您可以使用Task
的update_state
方法。他们最终会做同样的事情,但"update_state"这个名字更适合你想做的事情。您可以选择定义一个自定义状态来指示您的任务正在进行中(我将我的自定义状态命名为"PROGRESS"(:
def yielder():
for i in range(2**100):
yield i
@task
def report_progress():
for progress in yielder():
# set current progress on the task
report_progress.update_state(state='PROGRESS', meta={'progress': progress})
def view_function(request):
task_id = request.session['task_id']
task = AsyncResult(task_id)
progress = task.info['progress']
# do something with your current progress
芹菜部分:
def long_func(*args, **kwargs):
i = 0
while True:
yield i
do_something_here(*args, **kwargs)
i += 1
@task()
def test_yield_task(task_id=None, **kwargs):
the_progress = 0
for the_progress in long_func(**kwargs):
cache.set('celery-task-%s' % task_id, the_progress)
Web客户端,启动任务:
r = test_yield_task.apply_async()
request.session['task_id'] = r.task_id
测试最后产生的值:
v = cache.get('celery-task-%s' % session.get('task_id'))
if v:
do_someting()
如果您不喜欢使用缓存,或者不可能使用缓存,则可以使用数据库,文件或任何其他芹菜工人和服务器端都可以访问的地方。使用缓存是最简单的解决方案,但工作线程和服务器必须使用相同的缓存。
需要考虑的几个选项:
1 -- 任务组。 如果可以枚举调用时的所有子任务,则可以将组作为一个整体应用(返回一个 TaskSetResult 对象,可用于监视整个组或组中单个任务的结果(在需要检查状态时根据需要查询此对象。
2 -- 回调。 如果你不能枚举所有的子任务(或者即使你可以!(,你可以定义一个Web钩子/回调,这是任务的最后一步 - 在任务的其余部分完成时调用。挂钩将针对应用中引入结果并通过数据库或应用内部 API 使其可用 URI。
这些组合可以解决您的挑战。
另请参阅一位Instagram工程师的出色PyCon preso。
http://blogs.vmware.com/vfabric/2013/04/how-instagram-feeds-work-celery-and-rabbitmq.html
在视频标记16:00,他讨论了他们如何构建一长串子任务。