我目前正在使用芹菜中的Task.delay()
(使用RabbitMQ(来执行异步任务。这些任务分配给 8 个工作线程,在这些任务中,我正在执行数据库插入操作。
有时任务会失败,并且不会发生数据库插入。这可能是由于超时或 JSON 解码错误造成的。我想捕获执行失败的情况。
以下是相关代码:
views.py
def celery_url_filtering(request):
for each_data in dataset:
#each_data is a Json object
res = result.delay(each_data)
while(res.status == 'PENDING'):
pass
return JsonResponse({'ok':'Success'})
tasks.py
@app.task
def result(dataeach_data):
# Parse each_data and do data insertion here
return "Something"
如何在列表中捕获失败的执行?
从我从您的关注中可以理解的是,您想查看"挂起"或"失败"任务并重试/应用一些应用程序逻辑。
如果是这样,您可以根据需要按固定时间表、每天/每小时等运行 cron。此 cron 作业可以根据您拥有的计划捕获在最后一天/小时内失败的任务。
你可以使用 django-celery-beat 来设置 cron 作业,使用 Django ORM 使用 django-celery-result 来存储芹菜任务结果。
例如,您可以有这样的芹菜任务
from celery import shared_task
from django_celery_results.models import TaskResult
**tasks.py**
@shared_task(name="failed_task_cron", bind=True)
def failed_task_cron(self, **kwargs):
"""
Celery task to run on daily schedule to do something with the failed tasks
"""
tasks = TaskResult.objects.filter(status='FAILURE')
# tasks is a queryset of all the failed tasks
# Perform application logic
您可以在芹菜设置中像这样为上述任务设置 cron
from celery.schedules import crontab
# ...
CELERY_BEAT_SCHEDULE = {
"failed_task_cron": {
"task": "path/to/tasks.failed_task_cron",
"schedule": crontab(hour=1) # Runs every hour
}
}