Celery current_task.update_state not working



我正在尝试添加一个长时间运行的异步任务,并使其每隔一段时间更新进度状态。我使用的是Django rest框架服务器、RabbitMQ broker、Celery worker。我能够将任务添加到芹菜中,甚至能够在任务完成时成功检索结果。

然而,每当试图在任务完成前获得状态时,我都会得到以下信息:

task.result=无,task.state=PENDING,task.info=无

当试图获得完成后的状态时:

task.result=无,task.state=PENDING,task.info=无

DeployML/
manage.py
DeployML/
...
settings.py
tasks.py
dcelery.py
views.py
urls.py

dcelery.py:

import os
from celery import Celery
import sys
sys.path.append('.....DeployML')
sys.path.append('.....DeployML/DeployML')
# Set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'DeployML.settings')
app = Celery('DeployML', backend='rpc://', broker='pyamqp://guest@localhost//', worker_state_db = '/tmp/celery_state')
app.config_from_object('django.conf:settings')
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print(f'Request: {self.request!r}')

tasks.py:

import os
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'DeployML.settings')
from celery import shared_task
from celery import current_task
from dcelery import app
import time
@app.task(bind=True)
def addShrd(self, x, y):
for i in range(4):
time.sleep(5)
progress_percent = i
#NOTE update_state not working yet
current_task.update_state(state='PROGRESS', meta={'current': i, 'total': 4})
return x + y

views.py:

#...
#...
@api_view(['GET', 'POST'])
def celer_view(request):
print("authentication: ", request.user, request.auth)
if request.method == 'GET':
result = addShrd.delay(17,10)
return Response({'task_id': str(result.id)})    
elif request.method == 'POST':
return Response({'some': 'POST response data'})    
return Response({'some': 'data'})
from celery.result import AsyncResult
@api_view(['GET', 'POST'])
def poll_state(request):
data = 'Fail'
print("requ: "+str(request.POST.get('task_id')))
if 'task_id' in request.POST.keys() and request.POST['task_id']:
task_id = request.POST['task_id']
task = AsyncResult(task_id)
print("meta data: "+ str(task.info))
data = str(task.result) + " " +str(task.state) + " | " + str(task.info)
else:
data = 'No task_id in the request'
return Response({'some': 'poll_state data: '+data})

编辑:我已经更新了views.py poll_state方法以使用celery.result.SyncResult,而之前我使用的是dcelery。AsyncResult。然而,这并没有解决问题。

第2版:此问题现已解决。我发现执行update_state和AsyncResult能够检索到它之间有几秒钟的延迟。由于我的任务运行时间不长(只有20秒(,在RabbitMQ保存中间进度之前,任务已经完成。然而,如果我将任务运行时间增加到100秒。我可以看到中间结果。

checkout Asyncresult。

也可以在此处检查用法:celery.result.SyncResult用法

相关内容

  • 没有找到相关文章

最新更新