芹菜 - 在 tasks.py 中导入模型



我在访问 tasks.py 中的模型时遇到问题

我的目标是在应用程序的各个部分发送电子邮件(用户注册,重置密码等)。为此,我将用户 ID 传递给名为"send_email"的芹菜任务。

@shared_task()
def send_email(sender_id=None, receiver_id=None, type=None, message=None):
sender = User.objects.get(id=sender_id)
receiver = User.objects.get(id=receiver_id)
logger.info("Starting send email")
Email.send_email(sender, receiver, type, message)
logger.info("Finished send email")

然后,任务需要使用 id 检索用户并向他们发送电子邮件。尝试将用户模型导入 tasks.py 文件时,这会崩溃。

我收到错误

raise AppRegistryNotReady("Apps aren't loaded yet.") django.core.exceptions.AppRegistryNotReady: Apps aren't loaded yet.

我尝试过的事情

  1. 在文件顶部调用 django.setup() tasks.py - 原因

    raise RuntimeError("populate() isn't reentrant") 
    

    放入方法时也会导致相同的错误send_email。这些是关于SO中其他类似问题的建议

  2. 以"send_email"方法导入模型,允许工作线程启动,但会导致以下错误

    raise AppRegistryNotReady("Apps aren't loaded yet.") 
    

    这是对SO中类似问题的另一个建议

  3. 在调用"send_email"函数时删除 .delay,该函数有效(在文件 tasks.py 顶部或send_email方法中导入),但由于任务不再是异步的,因此没有任何好处,但可能会缩小问题范围?

注意事项?

  1. 我使用扩展 AbstractBaseUser 的自定义用户模型,我在芹菜中看到了许多与此相关的 github 问题,但我相信这些问题应该在芹菜 v3.1 左右修复
  2. 我使用的是芹菜 v4.1、django 1.11.10、python 2.7,并且正在使用 RabbitMQ 作为代理并在虚拟环境中运行工人/服务器。我正在使用

    celery -A api worker -l info
    

    在终端窗口中,然后使用 PyCharm 的终端启动服务器

    python manage.py runserver
    

    所以有效地有 2 个环境?这可能是问题所在吗?

  3. 这可能相关,也可能不相关,但为了让我的自定义用户模型在我的应用程序/模型中工作.py我只有一行导入用户模型,否则我会得到一个

    django.core.exceptions.ImproperlyConfigured: AUTH_USER_MODEL refers to model 'myApi.User' that has not been installed
    
  4. 我尝试将身份验证模型设置为"myApi.user.User"(用户是声明模型的文件夹,但得到一个

    Invalid model reference 'myApi.user.User'. String model references must be of the form 'app_label.ModelName'.
    

    所以我猜这就是为什么在 myApi/models 中需要导入的原因.py以便可以在这里拾取它?

项目结构

├── api
│   ├── __init__.py
│   ├── settings.py
│   ├── urls.py
│   └── wsgi.py
├── celerySettings.py # my celery.py
├── db.sqlite3
├── myApi
│   ├── __init__.py
│   ├── admin.py
│   ├── apps.py
│   ├── tasks.py
│   ├── urls.py
│   ├── user
│   │   ├── __init__.py
│   │   ├── managers.py
│   │   ├── models.py
│   │   ├── serializers.py
│   │   ├── urls.py
│   │   └── views.py
│   ├── utils
│   │   └── Email.py
│   ├── views.py
├── manage.py
└── static

tasks.py

from __future__ import absolute_import, unicode_literals
from celery.schedules import crontab
from celery.task import periodic_task
from celery.utils.log import get_task_logger
from celery import shared_task
from celery import current_app
from .user.models import User
from .utils import Email
logger = get_task_logger(__name__)
@shared_task()
def send_email(sender_id=None, receiver_id=None, type=None, message=None):
sender = User.objects.get(id=sender_id)
receiver = User.objects.get(id=receiver_id)
logger.info("Starting send email")
Email.send_email(sender, receiver, type, message)
logger.info("Finished send email")

settings.py

....
INSTALLED_APPS = [
'rest_framework',
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'corsheaders',
'myApi',
'celery',
'rest_framework.authtoken',
'rest_framework.renderers',
]
AUTH_USER_MODEL = 'myApi.User'
CELERY_IMPORTS = ('api.myApi.tasks')
....

celerySettings.py

from __future__ import absolute_import, unicode_literals
from django.conf import settings
import os
from celery import Celery

# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'api.api.settings')
app = Celery('api', broker='amqp://')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object(settings, namespace='CELERY')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))

我的 Api/模型.py

from user.models import User

myApi/admin.py

# -*- coding: utf-8 -*-
from __future__ import unicode_literals
from django.contrib import admin
from user.models import User
admin.site.register(User)

API/WSGi.py

import os
from django.core.wsgi import get_wsgi_application
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "api.settings")
application = get_wsgi_application()

任何建议将不胜感激。也很抱歉这篇文章很长,这是我的第一篇,所以不确定需要多少细节。

我发现了我的问题。如果它可以帮助其他卡在这一点上的人,我需要添加该行

sys.path.append(os.path.abspath('api'))

在我的 celerySettings.py 中,要拿起模型。

所以现在看起来像这样

from __future__ import absolute_import, unicode_literals
from django.conf import settings
import os, sys
from celery import Celery
sys.path.append(os.path.abspath('api'))
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'api.api.settings')
app = Celery('api', broker='amqp://')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
#   should have a `CELERY_` prefix.
app.config_from_object(settings, namespace='CELERY')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))

然后,在实际尝试在本地查询模型的数据库时,我遇到了另一个问题。Celery 说我的数据库表不存在,这是因为它正在实际本地数据库文件所在的文件夹上方创建一个新数据库,要修复它,我只需要更改数据库名称

"db.sqlite3"

os.path.join(os.path.dirname(__file__), "db.sqlite3")

在 settings.py

这有效地将其更改为

api/db.sqlite3

对于芹菜

希望这对其他人有所帮助,因为我花了太多时间在这个问题上挣扎。

相关内容

  • 没有找到相关文章

最新更新