所以我们有这个Django项目,有多个应用程序,我们使用芹菜来完成任务。我们遇到的问题是,只有单个应用程序 tasks.py 内的任务才会运行,其他应用程序中的其他 tasks.py 任务会返回以下错误:
celery_1 | [2018-10-22 08:27:59,563: ERROR/MainProcess] Received unregistered task of type 'biko.supplier.tasks.test_task'.
celery_1 | The message has been ignored and discarded.
celery_1 |
celery_1 | Did you remember to import the module containing this task?
celery_1 | Or maybe you're using relative imports?
celery_1 |
celery_1 | Please see
celery_1 | http://docs.celeryq.org/en/latest/internals/protocol.html
celery_1 | for more information.
celery_1 |
celery_1 | The full contents of the message body was:
celery_1 | b'[[], {}, {"callbacks": null, "errbacks": null, "chain": null, "chord": null}]' (77b)
celery_1 | Traceback (most recent call last):
celery_1 | File "/usr/local/lib/python3.6/site-packages/celery/worker/consumer/consumer.py", line 557, in on_task_received
celery_1 | strategy = strategies[type_]
celery_1 | KeyError: 'biko.supplier.tasks.test_task'
当我运行 test_task.delay(( 时会发生这种情况
以下是供应商 tasks.py:
from config.celery import app
@app.task(shared=True)
def test_task():
print("Runnign this task correctly")
以下是商店 tasks.py 的一部分,其中任务确实可以正常工作:
from django.contrib.contenttypes.models import ContentType
from config.celery import app
from celery_once import QueueOnce
from django.core.management import call_command
from django.utils import timezone
from raven.contrib.django.raven_compat.models import client
from biko.shop.models import Shop
from config.settings import MAX_INCOMING_BUFFER_RETRIES
from biko.buffer.models import IncomingBuffer, OutgoingBuffer
@app.task(shared=True)
def buffer_products(shop_id):
shop = Shop.objects.get(id=shop_id)
shop.get_manager().buffer_products()
这是芹菜配置:
import os
from celery import Celery
from celery.schedules import crontab
from django.conf import settings
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings")
app = Celery('biko')
app.config_from_object('django.conf:settings')
app.autodiscover_tasks()
app.conf.ONCE = {
'backend': 'celery_once.backends.Redis',
'settings': {
'url': 'redis://redis',
'blocking': True,
'default_timeout': 60 * 60,
'blocking_timeout': 86400
}
}
任何不属于商店/任务的任务.py都不会显示为正在加载。我不知道为什么它会从商店/任务加载任务.py而不是从另一个应用程序加载任务。
在芹菜配置中;您可以执行以下操作:
# Where app_module represents where tasks exists.
app = Celery('biko', include=['app_module.tasks'])
# Your line should also work, but sometimes it needs the apps configs
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)