Django Celery:任务永远不会执行



在我的django应用程序中,我正在使用芹菜。在post_save信号中,我正在弹性搜索中更新索引。但是由于某种原因,任务被挂起并且从未实际执行代码:

我用来跑芹菜的:

celery -A collegeapp worker -l info

信号:

@receiver(post_save, sender=University)
def university_saved(sender, instance, created, **kwargs):
"""
University save signal
"""
print('calling celery task')
update_university_index.delay(instance.id)
print('finished')

任务:

@task(name="update_university_index", bind=True, default_retry_delay=5, max_retries=1, acks_late=True)
def update_university_index(instance_id):
print('updating university index')

我得到的唯一输出是calling celery task. 等待 30 多分钟后,它永远不会进入任何其他打印语句,视图继续等待。芹菜终端中从未显示过任何东西。

版本:姜戈 3.0, 芹菜 4.3, 雷迪斯 5.0.9, 乌班图18

更新:在进行一些测试后,使用celery.py文件中定义的debug_task代替update_university_index不会导致挂起。它的行为符合预期。我想也许可能是app.taskvstask装饰器,但似乎不是这样。

@app.task(bind=True)
def debug_task(text, second_value):
print('printing  debug_task {} {}'.format(text, second_value))

我仍然不确定为什么它不起作用,但我找到了将task替换为app.task的解决方案

从我的celery.py导入app似乎已经解决了这个问题。

from collegeapp.celery import app
@app.task(name="update_university_index", bind=True, default_retry_delay=5, max_retries=1, acks_late=True)
def update_university_index(self, instance_id):
print('updating university index')

这曾经发生在我身上,我犯了一个最愚蠢的错误,django 告诉我们在tasks.py文件中指定芹菜任务,并将其用于任务发现。在那之后它奏效了。您能否使用tree命令更深入地了解目录结构?

本教程适用于flask,但在 django 中也可以实现相同的目的。这个特殊的教程的亮点在于,在您告诉芹菜执行任务后,它还为您提供了一个uuid,您可以ping该 url 并监控您触发的任务的进度。

验证任务是否已由芹菜使用(请确保芹菜正在运行(:

from celery.task.control import inspect
i = inspect()
i.registered_tasks()

或猛击

$ celery inspect registered
$ celery -A collegeapp inspect registered

从 https://docs.celeryproject.org/en/latest/faq.html#the-worker-isn-t-doing-anything-just-hanging

为什么 Task.delay/apply*/worker 只是挂起?

答:某些 AMQP 客户端中存在一个错误,如果它无法对当前用户进行身份验证、密码不匹配或用户无权访问指定的虚拟主机,则会使其挂起。请务必检查您的代理日志(对于大多数系统上为/var/log/rabbitmq/rabbit.log的 RabbitMQ(,它通常包含一条描述原因的消息。

更改此行

@task(name="update_university_index", bind=True, default_retry_delay=5, max_retries=1, acks_late=True)
def update_university_index(instance_id):
print('updating university index')

@task(name="update_university_index", bind=True, default_retry_delay=5, max_retries=1, acks_late=True)
def update_university_index(self, instance_id):
print('updating university index')

或者将self添加到任务定义中。

相关内容

  • 没有找到相关文章

最新更新