无法使用 Celery + Django 导入 tasks.py 中的模型



我想创建一个后台任务来更新特定日期的记录。我将Django和Celery与RabbitMQ一起使用。

我已经设法在模型保存时调用了这个任务,这个任务函数是:

任务.py

from __future__ import absolute_import
from celery import Celery
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
app = Celery('tasks', broker='amqp://localhost//')

@app.task(name='news.tasks.update_news_status')
def update_news_status(news_id):
# (I pass the news id and return it, nothing complicated about it)
return news_id

这个任务是从模型中的save((方法调用的。py

from django.db import models
from celery import current_app

class News(models.model):
(...)
def save(self, *args, **kwargs):
current_app.send_task('news.tasks.update_news_status', args=(self.id,))
super(News, self).save(*args, **kwargs)

问题是我想在tasks.py中导入我的新闻模型,但如果我想这样做:

from .models import News

我得到这个错误:

django.core.exceptions.ImproverlyConfigure:请求的设置DEFAULT_INDEX_TABLESPACE,但未配置设置。你必须定义环境变量DJANGO_SETTING_MODULE或调用settings.config((,然后再访问设置。

这就是mi芹菜.py看起来像的原因

from __future__ import absolute_import, unicode_literals
from celery import Celery
import os
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myapp.settings')
app = Celery('myapp')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))

我已经试过了:

  1. 可以';t将django模型导入芹菜任务
  2. 我已经尝试在任务方法Django和Celery中进行导入,AppRegisteredNotReady异常
  3. 我也尝试过这个Celery-在tasks.py中导入模型
  4. 我还试图创建一个utils.py并导入它,但这是不可能的

并遇到不同的错误,但最终我无法在任务中导入任何模块。py

我的配置可能有问题,但我看不到错误,我遵循了the Celery Docs:Django 的第一步

此外,我的项目结构如下:

├── myapp
│   ├── __init__.py
├── ├── celery.py
│   ├── settings.py
│   ├── urls.py
│   └── wsgi.py
├── news
│   ├── __init__.py
│   ├── admin.py
│   ├── apps.py
│   ├── tasks.py
│   ├── urls.py
│   ├── models.py
│   ├── views.py
├── manage.py

我正在从myapp目录执行工作程序,如下所示:

celery -A news.tasks worker --loglevel=info

我在这里错过了什么?提前感谢您的帮助!

lambda:设置。已安装的应用程序

编辑

在做出评论中建议的更改后:将此添加到celery.pyapp.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

和导入内部方法:tasks.py

from __future__ import absolute_import
from celery import Celery
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
app = Celery('tasks', broker='amqp://localhost//')

@app.task(name='news.tasks.update_news_status')
def update_news_status(news_id):
from .models import News
return news_id

我得到以下错误:

[2018-07-20 12:24:29,337: ERROR/ForkPoolWorker-1] Task news.tasks.update_news_status[87f9ec92-c260-4ee9-a3bc-5f684c819f79] raised unexpected: ValueError('Attempted relative import in non-package',)
Traceback (most recent call last):
File "/Users/carla/Develop/App/backend/myapp-venv/lib/python2.7/site-packages/celery/app/trace.py", line 382, in trace_task
R = retval = fun(*args, **kwargs)
File "/Users/carla/Develop/App/backend/myapp-venv/lib/python2.7/site-packages/celery/app/trace.py", line 641, in __protected_call__
return self.run(*args, **kwargs)
File "/Users/carla/Develop/App/backend/news/tasks.py", line 12, in update_news_status
from .models import News
ValueError: Attempted relative import in non-package

好吧,所以对于任何正在努力解决这个问题的人来说。。。原来我的celery.py没有从设置中读取env变量。

经过一周的大量研究,我意识到Celery不是Django的一个进程,而是一个在Django之外运行的进程(duh(,所以当我试图加载设置时,它们被加载了,但后来我无法访问我在.env中定义的env变量(我使用dotenv库(Celery试图在我的.bash_profile中查找env变量(当然(

因此,最终我的解决方案是在定义celery.py的同一目录中创建一个助手模块,称为load_env.py,带有以下

from os.path import dirname, join
import dotenv

def load_env():
"Get the path to the .env file and load it."
project_dir = dirname(dirname(__file__))
dotenv.read_dotenv(join(project_dir, '.env'))

然后在我的芹菜.py上(注意最后一个导入和第一个指令(

from __future__ import absolute_import, unicode_literals
from celery import Celery
from django.conf import settings
import os
from .load_env import load_env
load_env()
# set the default Django settings module for the 'celery' program.
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myapp.settings")
app = Celery('myapp')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object('myapp.settings', namespace='CELERY')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

在加载了对load_env()env变量的调用并且芹菜工作者可以访问这些变量之后。通过这样做,我现在可以从我的任务访问其他模块。py,这是我的主要问题。

这要归功于这些人(Caktus咨询集团(和他们的django项目模板,因为如果没有他们,我就找不到答案。谢谢

试试这样的方法。它在3.1芹菜中工作,导入应该发生在save方法内部和super((之后

from django.db import models

class News(models.model):
(...)
def save(self, *args, **kwargs):
(...)
super(News, self).save(*args, **kwargs)
from task import update_news_status
update_news_status.apply_async((self.id,)) #apply_async or delay

在这里我要做的是(Django 1.11和芹菜4.2(,您的芹菜配置中有一个问题,您试图重新声明celery实例:

任务.py

from myapp.celery import app # would contain what you need :)
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
@app.task(name='news.tasks.update_news_status')
def update_news_status(news_id):
# (I pass the news id and return it, nothing complicated about it)
return news_id

芹菜.py

from __future__ import absolute_import, unicode_literals
from celery import Celery
from django.conf import settings
import os
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myapp.settings")
app = Celery('myapp', backend='rpc://', broker=BROKER_URL) # your config here
app.config_from_object('django.myapp:settings', namespace='CELERY') # change here
app.autodiscover_tasks()

型号.py

from django.db import models
class News(models.model):
(...)
def save(self, *args, **kwargs):
super(News, self).save(*args, **kwargs)
from news.tasks import update_news_status
update_news_status.delay(self.id) # change here

并使用celery -A myapp worker --loglevel=info启动它,因为你的应用程序是在myapp.celery中定义的,所以-一个参数需要是conf声明为的应用程序

相关内容

  • 没有找到相关文章

最新更新