我将芹菜与烧瓶一起使用,我有以下任务:
@celery.task(bind=True, max_retries=5)
def failing_task(self):
time.sleep(random.randint(4, 10))
try:
raise ValueError('Exception occurred')
except Exception as e:
self.retry(exc=e, countdown=int(random.uniform(2, 3) ** self.request.retries))
我创建了烧瓶端点以访问任务结果:
@app.route('/api/task/<task_id>')
def task_status(task_id):
task = AsyncResult(task_id)
task_result = task.result
if isinstance(task_result, Exception):
task_result = '{e.__class__.__name__}: {e}'.format(e=task_result)
app.logger.info(task.info)
content = {'task_id': task.id,
'state': task.state,
'result': task_result}
return jsonify(content)
我想返回重试的尝试号码,以便可以在网页上显示它。我知道我可以在中使用 self.request.retries
进行>,但是我没有弄清楚如何从asyncresult对象访问它。
研究数小时后,我通过向异常对象添加所需信息来解决它。由于此信息未通过JSON Serializer序列化,因此我还需要切换到泡菜 serialializer。
app/tasks.py
from celery import Celery
celery = Celery('tasks', broker=constants.CELERY_URL, backend=constants.CELERY_URL)
celery.conf.CELERYD_TASK_SOFT_TIME_LIMIT = 120
celery.conf.CELERY_RESULT_SERIALIZER = 'pickle'
celery.conf.CELERY_TASK_SERIALIZER = 'pickle'
celery.conf.CELERY_ACCEPT_CONTENT = ['json', 'pickle']
@celery.task(bind=True)
def failing_task(self):
try:
time.sleep(random.randint(4, 10))
raise ValueError('Exception occurred in failing task')
except Exception as e:
e.retry_number = self.request.retries + 1
e.max_retries = self.max_retries
self.retry(exc=e, countdown=int(random.uniform(2, 3) ** self.request.retries))
app/main.py
@app.route('/api/task/<task_id>')
def task_status(task_id):
task = AsyncResult(task_id)
content = {'task_id': task.id,
'state': task.state,
'result': task.result}
if isinstance(task.result, Exception):
content['result'] = '{e.__class__.__name__}: {e}'.format(e=task.result)
content['retry_number'] = task.result.retry_number
content['max_retries'] = task.result.max_retries
return jsonify(content)