我有一个Django应用程序,它使用Celery来卸载一些任务。主要是,它推迟了数据库表中某些字段的计算。
所以,我有一项任务。py:
from models import MyModel
from celery import shared_task
@shared_task
def my_task(id):
qs = MyModel.objects.filter(some_field=id)
for record in qs:
my_value = #do some computations
record.my_field = my_value
record.save()
在模型中.py
from django.db import models
from tasks import my_task
class MyModel(models.Model):
field1 = models.IntegerField()
#more fields
my_field = models.FloatField(null=True)
@staticmethod
def load_from_file(file):
#parse file, set fields from file
my_task.delay(id)
现在很明显,由于循环导入(models
导入tasks
,tasks
导入models
),这将不起作用。
目前我已经通过从views.py
调用my_task.delay()
解决了这个问题,但将模型逻辑保留在模型类中似乎是有意义的。有更好的方法吗?
joshua发布的解决方案非常好,但当我第一次尝试时,我发现我的@receiver
装饰器没有效果。这是因为tasks
模块没有导入到任何位置,这是我使用任务自动发现时所期望的。
然而,还有另一种方法可以将tasks.py
与modules.py
解耦。也就是说,任务可以按名称发送,并且在发送任务的过程中不必对其进行评估(导入):
from django.db import models
#from tasks import my_task
import celery
class MyModel(models.Model):
field1 = models.IntegerField()
#more fields
my_field = models.FloatField(null=True)
@staticmethod
def load_from_file(file):
#parse file, set fields from file
#my_task.delay(id)
celery.current_app.send_task('myapp.tasks.my_task', (id,))
send_task()
是Celery应用程序对象上的一个方法。
在此解决方案中,重要的是要为任务指定正确、可预测的名称。
在您的模型中,您可以在使用my_task
之前导入它,而不是在文件开头导入CCD_12。这将解决循环导入问题。
from django.db import models
class MyModel(models.Model):
field1 = models.IntegerField()
#more fields
my_field = models.FloatField(null=True)
@staticmethod
def load_from_file(file):
#parse file, set fields from file
from tasks import my_task # import here instead of top
my_task.delay(id)
或者,您也可以在tasks.py
中做同样的事情。您可以在使用模型之前导入模型,而不是开始导入。
替代方案:
您可以使用send_task
方法调用您的任务
from celery import current_app
from django.db import models
class MyModel(models.Model):
field1 = models.IntegerField()
#more fields
my_field = models.FloatField(null=True)
@staticmethod
def load_from_file(file):
#parse file, set fields from file
current_app.send_task('myapp.tasks.my_task', (id,))
为了在这个列表中再添加一个不太好的解决方案,我最终所做的是依赖django现在内置的应用程序注册表。
因此,在tasks.py
中,您可以使用apps.get_model()
来访问模型,而不是从模型导入。
我用一个带有健康文档的辅助方法来做这件事,只是为了表达为什么这很痛苦:
from django.apps import apps
def _model(model_name):
"""Generically retrieve a model object.
This is a hack around Django/Celery's inherent circular import
issues with tasks.py/models.py. In order to keep clean abstractions, we use
this to avoid importing from models, introducing a circular import.
No solutions for this are good so far (unnecessary signals, inline imports,
serializing the whole object, tasks forced to be in model, this), so we
use this because at least the annoyance is constrained to tasks.
"""
return apps.get_model('my_app', model_name)
然后:
@shared_task
def some_task(post_id):
post = _model('Post').objects.get(pk=post_id)
当然,您可以直接使用apps.get_model()
。
使用信号。
tasks.py
from models import MyModel, my_signal
from celery import shared_task
from django.dispatch import receiver
@shared_task
def my_task(id):
qs = MyModel.objects.filter(some_field=id)
for record in qs:
my_value = #do some computations
record.my_field = my_value
record.save()
@receiver(my_signal)
def my_receiver(sender, **kwargs):
my_task.delay(kwargs['id'])
型号.py
from django.db import models
from tasks import my_task
from django.dispatch import Signal
my_signal = Signal(providing_args=['id'])
class MyModel(models.Model):
field1 = models.IntegerField()
#more fields
my_field = models.FloatField(null=True)
@staticmethod
def load_from_file(file):
#parse file, set fields from file
my_signal.send(sender=?, id=?)
不确定这是否是其他人的问题,但我花了几个小时,找到了解决方案。。。主要是文件中的关键:
使用@shared_task装饰器
您编写的任务可能存在于可重用的应用程序中,而可重用应用程序不能依赖于项目本身,因此您也不能直接导入应用程序实例。
基本上我所做的就是。。。
####
# project/coolapp/tasks.py -- DON'T DO THIS
import os
from celery import Celery
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings")
app = Celery("coolapp")
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()
@app.task(bind=True)
def some_task(self, some_id):
from coolapp.models import CoolPerson
####
# project/coolapp/__init__.py -- DON'T DO THIS
from __future__ import absolute_import, unicode_literals
from .tasks import app as celery_app
__all__ = ("celery_app",)
因此,我收到了关于缺少应用程序标签的奇怪错误(这清楚地表明了循环导入)。
解决方案。。。
重构器project/coolapp/tasks.py
->19和CCD_ 20->project/project/__init__.py
。
重要信息:这不会(也不应该)添加到INSTALLED_APPS
中。否则,您将获得循环导入。
因此,开始工作:
celery -A project.project worker -l INFO
此外,还有一个调试技巧
当你想知道你的任务是否被正确发现时,把它放在project/project/app.py
:中
app.autodiscover_tasks()
assert "project.app.tasks.some_task" in app.tasks
否则,你将不得不启动工作程序,却发现你的任务不包括在应用程序中,然后你将不得不等待关闭。