注册celeni定期任务,以便在django-admin中查看它



我正试图在Django和Celery中编写一些周期性后台任务

我的任务正在运行好吧,当使用节拍调度器启动我的芹菜工作程序时,我设法定期启动这些任务:
celery -A parifex worker -B -l info
工作程序发现的任务

正如您在上图中看到的,我的任务被发现得很好。但是,INSTALLED_APPS上的自动发现没有按预期运行,所以我必须包括我想在Celery实例化中查找任务的应用程序的路径。

要求

pip freeze |grep "celery|Django"
celery==4.4.1
Django==3.1.2
django-celery==3.3.1

进入设置所在的基本应用程序:

celery.py

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from django.conf import settings
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'parifex.settings')
# Define the Celery configuration and include <app>.tasks to discover it
app = Celery('parifex', include=['device.tasks'])
# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS, force=True)

@app.task(bind=True)
def debug_task(self):
print(f'Request: {self.request!r}')

设置.py

djcelery.setup_loader()
CELERYD_HIJACK_ROOT_LOGGER = False
# http://celery.readthedocs.org/en/latest/configuration.html#celery-redirect-stdouts-level
CELERY_REDIRECT_STDOUTS = True  # par défaut
CELERY_REDIRECT_STDOUTS_LEVEL = 'DEBUG'
CELERY_BROKER_URL = 'redis://:Redi$DB_withCelery@127.0.0.1:6234'
# store schedule in the DB:
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
CELERY_RESULT_BACKEND = 'redis://:Redi$DB_withCelery@127.0.0.1:6234'
CELERY_TASK_SERIALIZER = 'json'
CELERY_ACCEPT_CONTENT = ['json']  # Ignore other content
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Europe/Paris'
CELERY_ENABLE_UTC = True
CELERY_TRACK_STARTED = True
CELERYD_POOL_RESTARTS = True

进入名为Device的模块应用程序

tasks.py

from celery import shared_task
from celery.schedules import crontab
from celery.task import periodic_task
from django.core.serializers import serialize
from djcelery.schedulers import DatabaseScheduler, ModelEntry
from .models import Device
from .api.tools import TestDeviceApi

@shared_task
def count_devices():
return Device.objects.count()

@shared_task(serializer='json', bind=True)
def get_devices_state():
device_status = {}
print(serialize('json', Device.objects.all()))
for device in Device.objects.all():
status = TestDeviceApi(device).ping()
device_status[device.id] = (serialize('json', [device]), status)
return device_status

@periodic_task(run_every=(crontab(minute='*/1')))
def print_random():
print(15)

好吧,正如我所说,我的工作人员发现任务很好,但我在Django中的管理页面没有看到。

第一个问题:当按页面创建定期任务时,如何注册我的任务以便在管理页面中查看它?还是没用?注册任务我的管理页面可以看到

第二个问题:在没有任何其他模块安装的情况下,是否可以在管理页面中看到硬编码的定期任务?有没有"内置"的方法可以做到这一点?或者,每次调用任务时,我可能不得不尝试自己保存它吗?

我的目标是让用户能够使用默认行为自行更改任务的调度。

提前感谢您的帮助!我希望我是清白的。

我也遇到了同样的问题。

我通过将这些行添加到proj/proj/__init__.py 来修复它

from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app
__all__ = ('celery_app',)

不确定这是最好的解决方案还是只是一个变通方案,但显然这将确保在Django开始使用shared_task时导入应用程序。

相关内容

  • 没有找到相关文章

最新更新