所以这就是我想做的。我有一个每 X 分钟运行一次的计划任务。在任务中,我创建了一组任务,我希望它们彼此并行运行。在他们全部完成后,我想记录该组是否已成功完成。这是我的代码:
@shared_task(base=HandlersImplTenantTask, acks_late=True)
def my_scheduled_task():
try:
needed_ids = MyModel.objects.filter(some_field=False)
.filter(some_other_field=True)
.values_list("id", flat=True)
.order_by("id")
if needed_ids:
tasks = [my_single_task.s(needed_id=id) for id in needed_ids]
job = group(tasks)
result = job.apply_async()
returned_values = result.get()
if result.ready():
if result.successful():
logger.info("SUCCESSFULLY FINISHED ALL THE SUBTASKS")
else:
returned_values = result.get()
logger.info("UNSUCCESSFULLY FINISHED ALL THE SUBTASKS WITH THE RESULTS %s" % returned_values)
else:
logger.info("no needed ids found")
except:
logger.exception("got an unexpected exception while running task")
这是my_single_task代码:
@shared_task(base=HandlersImplTenantTask)
def my_single_task(needed_id):
logger.info("starting task for tenant: [%s]. got id [%s]", connection.tenant, needed_id)
return
这就是我经营芹菜的方式:manage.py 芹菜工人 -c 2 --broker=[我的兔子mq brocker url]
当我到达该行时result.get()
它挂起了。 我看到具有第一个 ID 的单个任务的单个日志条目,但我看不到其他 ID。 当我杀死我的芹菜进程并重新启动它时 - 它会重新运行计划任务,我看到第二个日志条目带有第二个 ID(从任务第一次运行开始)。 关于如何解决这个问题的任何想法?
编辑 - 为了尝试克服这个问题 - 我创建了一个名为"new_queue"的不同队列。我开始了另一个芹菜工人来听新的队列。我想让其他工人接受任务并完成它们。我认为这可以解决僵局问题。我已将代码更改为如下所示:
job = group(tasks)
job_result = job.apply_async(queue='new_queue')
results = job_result.get()
但我仍然遇到死锁,如果我删除results = job_result.get()
行,我可以看到任务是由主要工作人员处理的,并且没有将任何内容发布到new_queue队列中。有什么想法吗?这是我的芹菜配置:
tenant_celery_app.conf.update(CELERY_RESULT_BACKEND='djcelery.backends.database.DatabaseBackend'
CELERY_RESULT_DB_TABLENAMES = {
'task': 'tenantapp_taskmeta',
'group': 'tenantapp_groupmeta',
}
这是我运行工人的方式:
芹菜工人 -c 1 -Q new_queue --broker=[amqp_brocker_url]/[vhost]
芹菜工人 -c 1 --broker=[amqp_brocker_url]/[vhost]
因此,我一直在寻找的解决方案确实是创建一个新队列并启动一个处理新队列的新工作线程。我遇到的唯一问题是将组任务发送到新队列。这是对我有用的代码。
tasks = [my_single_task.s(needed_id=id).set(queue='new_queue') for id in needed_ids]
job = group(tasks)
job_result = job.apply_async()
results = job_result.get() # this will block until the tasks finish but it wont deadlock
这些是我的芹菜工人
celery worker -c 1 -Q new_queue --broker=[amqp_brocker_url]/[vhost]
celery worker -c 1 --broker=[amqp_brocker_url]/[vhost]
你似乎死锁了你的队列。想想吧。如果您有一个任务等待其他任务,并且队列已满,则第一个任务将永远挂起。
您需要重构代码以避免在任务中调用result.get()
(您可能已经在日志中对此发出警告)
我会推荐这个:
@shared_task(base=HandlersImplTenantTask, acks_late=True)
def my_scheduled_task():
needed_ids = MyModel.objects.filter(some_field=False)
.filter(some_other_field=True)
.values_list("id", flat=True)
.order_by("id")
if needed_ids:
tasks = [my_single_task.s(needed_id=id) for id in needed_ids]
job = group(tasks)
result = job.apply_async()
这就是您所需要的。
使用日志记录来跟踪任务是否失败。
如果应用程序中其他位置的代码需要跟踪作业是否失败,则可以使用 celery 的检查 api。