我正在努力使芹菜适合高可用性,我已经分叉了django_celery项目和这个芹菜分叉,以便进行我需要的自定义。芹菜链接显示对使用以下代码的 beat.py 的修改:
我已将此 Lock 模型添加到django_celery models.py 文件中,并且能够很好地迁移:
from django.db import models
@python_2_unicode_compatible
class Lock(models.Model):
name = models.CharField(max_length=127, unique=True)
created = models.DateTimeField(auto_now=True)
class Meta:
verbose_name_plural = _('locks')
def __str__(self):
return self.name
在 utils 文件夹中的芹菜中,我添加了这个 locked.py 文件:
from djcelery.models import Lock
from datetime import datetime, timedelta
from django.db import transaction, IntegrityError
class Locked(object):
"""A context manager to add a distributed mutex."""
def __init__(self, name, timeout):
self.name = name
self.lock = None
self.timeout = timeout
def __enter__(self):
# first delete any expired locks
expired = datetime.utcnow() - timedelta(seconds=self.timeout)
Lock.objects.filter(name=self.name, created__lte=expired).delete()
# then try to get the lock
try:
Lock(name=self.name).save()
except IntegrityError:
transaction.rollback()
raise LockError('Could not acquire lock: {0}'.format(self.name))
def __exit__(self, *args):
Lock.objects.filter(name=self.name).delete()
class LockError(Exception):
"""Exception thrown when the requested lock already exists."""
pass
通过这些更改,我可以运行以下命令:
celery worker
python manage.py runserver
python manage.py shell
当我尝试运行调度程序时,我出现问题:
celery beat
我收到以下错误:
Traceback (most recent call last):
File "venv/bin/celery", line 11, in <module>
load_entry_point('celery', 'console_scripts', 'celery')()
File "/venv/src/celery/celery/__main__.py", line 30, in main
main()
File "/venv/src/celery/celery/bin/celery.py", line 81, in main
cmd.execute_from_commandline(argv)
File "/venv/src/celery/celery/bin/celery.py", line 793, in execute_from_commandline
super(CeleryCommand, self).execute_from_commandline(argv)))
File "/venv/src/celery/celery/bin/base.py", line 311, in execute_from_commandline
return self.handle_argv(self.prog_name, argv[1:])
File "/venv/src/celery/celery/bin/celery.py", line 785, in handle_argv
return self.execute(command, argv)
File "/venv/src/celery/celery/bin/celery.py", line 717, in execute
).run_from_argv(self.prog_name, argv[1:], command=argv[0])
File "/venv/src/celery/celery/bin/base.py", line 315, in run_from_argv
sys.argv if argv is None else argv, command)
File "/venv/src/celery/celery/bin/base.py", line 377, in handle_argv
return self(*args, **options)
File "/venv/src/celery/celery/bin/base.py", line 274, in __call__
ret = self.run(*args, **kwargs)
File "/venv/src/celery/celery/bin/beat.py", line 72, in run
beat = partial(self.app.Beat,
File "/venv/lib/python2.7/site-packages/kombu/utils/__init__.py", line 325, in __get__
value = obj.__dict__[self.__name__] = self.__get(obj)
File "/venv/src/celery/celery/app/base.py", line 572, in Beat
return self.subclass_with_self('celery.apps.beat:Beat')
File "/venv/src/celery/celery/app/base.py", line 504, in subclass_with_self
Class = symbol_by_name(Class)
File "/venv/lib/python2.7/site-packages/kombu/utils/__init__.py", line 96, in symbol_by_name
module = imp(module_name, package=package, **kwargs)
File "/usr/local/Cellar/python/2.7.13/Frameworks/Python.framework/Versions/2.7/lib/python2.7/importlib/__init__.py", line 37, in import_module
__import__(name)
File "/venv/src/celery/celery/apps/beat.py", line 19, in <module>
from celery import VERSION_BANNER, platforms, beat
File "/venv/src/celery/celery/beat.py", line 35, in <module>
from .utils.locked import Locked, LockError
File "/venv/src/celery/celery/utils/locked.py", line 1, in <module>
from djcelery.models import Lock
File "/venv/src/django-celery/djcelery/models.py", line 30, in <module>
class TaskMeta(models.Model):
File "/venv/lib/python2.7/site-packages/django/db/models/base.py", line 105, in __new__
app_config = apps.get_containing_app_config(module)
File "/venv/lib/python2.7/site-packages/django/apps/registry.py", line 237, in get_containing_app_config
self.check_apps_ready()
File "/venv/lib/python2.7/site-packages/django/apps/registry.py", line 124, in check_apps_ready
raise AppRegistryNotReady("Apps aren't loaded yet.")
django.core.exceptions.AppRegistryNotReady: Apps aren't loaded yet.
我的INSTALLED_APPS
设置中有djcelery,所以我不知道此时发生了什么?
必须指定要用于 celery 命令的应用实例
-一个应用程序,--app=要使用的应用程序实例(例如module.attr_name)
例如,如果我有结构
pybilling
- pybilling
- celeryconfig.py
然后我应该用命令启动芹菜节拍
celery --app pybilling.celeryconfig:app beat
以下是 celeryconfig.py 的内容
from __future__ import absolute_import
import os
from celery import Celery
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'pybilling.settings')
from django.conf import settings # noqa
app = Celery('pybilling')
# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
一起使用,并且与 Django 兼容,但本质上不是 Django 应用程序。你所做的修改是这样的,当你运行celery beat
时,Django 模型被加载。为了能够使用这些模型,必须先初始化应用程序。执行此操作的标准方法是在设置内容后调用django.setup()
,以便 Django 的代码可以找到 Django 设置。它可以是这样的:
import os
import django
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "project.settings")
django.setup()
您需要将project.settings
更改为包含设置的实际模块名称。