如何在django设置中配置芹菜_schedule



我可以将其作为独立应用程序运行,但是我很难在Django中使用它。

这是独立的代码:

from celery import Celery
from celery.schedules import crontab

app = Celery('tasks')
app.conf.update(
    CELERY_TASK_SERIALIZER='json',
    CELERY_RESULT_SERIALIZER='json',
    CELERY_ACCEPT_CONTENT=['json'],
    CELERY_TIMEZONE='US/Central',
    CELERY_ENABLE_UTC=True,
    CELERYBEAT_SCHEDULE = {
    'test': {
        'task': 'tasks.test',
        'schedule': crontab(),
        },
    }
)
@app.task
def test():
    with open('test.txt', 'a') as f:
        f.write('Hello, World!n')`

它为RabbitMQ服务器提供馈送,并每分钟写入文件。它像魅力一样工作,但是当我尝试使其在Django中工作时,我会得到此错误:

您是否记得导入包含此任务的模块?也许您正在使用相对进口?请参阅____ 更多信息。

消息主体的完整内容是:{'retries':0,'eta':无, 'kwargs':{},'taskset':none,'timelimit':[none,none],'callbacks': 无,'task':'proj.test','args':[],'expires':none,'id': '501CA998-B5EB-4BA4-98A8-AFABDA9E88DD','utc':true,'errbacks':无, '和弦':none}(246b)追溯(最新呼叫上次):文件 "/home/user/celerydjango/venv/lib/python3.5/site-packages/celery/worker/consumer.py", 第456行,在on_task_received中 策略[name](消息,正文,键盘:'proj.test'[2016-06-16 01:16:00,051:info/beat]调度程序:发送应有的任务测试(proj.test) [2016-06-16 01:16:00,055:错误/mainprocess]未注册 类型" proj.test"的任务。

这是我在django中的代码:

# CELERY STUFF
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'US/Central'
CELERYBEAT_SCHEDULE = {
    'test': {
        'task': 'proj.test',
        'schedule': crontab(),
    }
}

芹菜.py

from __future__ import absolute_import
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
from django.conf import settings  # noqa
app = Celery('proj')
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)

@app.task(bind=True)
def debug_task(self):
    print('Request: {0!r}'.format(self.request))

task.py

from __future__ import absolute_import
from celery import shared_task

@shared_task
def test():
    with open('test.txt', 'w') as f:
        print('Hello, World', file=f)

init.py

from __future__ import absolute_import
from .celery import app as celery_app 

对此有任何想法。谢谢。

为什么不尝试以下内容,让我知道它是否适合您。它确实对我有用。

在设置中。py

CELERYBEAT_SCHEDULE = {
    'my_scheduled_job': {
        'task': 'run_scheduled_jobs', # the same goes in the task name
        'schedule': crontab(),
    },
}

和tasks.py ..

from celery.task import task # notice the import of task and not shared task. 
@task(name='run_scheduled_jobs') # task name found! celery will do its job
def run_scheduled_jobs():
    # do whatever stuff you do
    return True

但是,如果您正在寻找共享_task,则..

@shared_task(name='my_shared_task') # name helps celery identify the functions it has to run
def my_shared_task():
    # do what you want here..
    return True

我将共享任务用于异步工作。.因此,我需要从以下功能中调用它。

在views.py/或where.py.py中

中的py
def some_function():
    my_shared_task.apply_async(countdown= in_seconds)
    return True

,以防万一,如果您忘记了,请记住包括您试图运行任务的应用程序。

INSTALLED_APPS = [...
 'my_app'...
] # include app

我敢肯定这种方法可以正常工作..谢谢

相关内容

  • 没有找到相关文章

最新更新