我试图用芹菜创建一个进度条。不幸的是,我的辅助函数没有被调用。
def checkconfig_post_new (request):
job = do_work.delay()
print ("Work started")
return HttpResponseRedirect ( reverse ( 'poll_state' ) + '?job=' + job.id )
# this decorator is all that's needed to tell celery this is a worker task
@task()
def do_work():
for i in range(10):
sleep(0.1)
print (i)
current_task.update_state(state='PROGRESS',
meta={'current': i, 'total': 10})
打印语句"工作已开始"将永远不会打印出来。此外,"i"永远不会被打印出来。
看来,这个程序将停止工作。
这是完整的代码:
芹菜.py
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'devadmin.settings')
app = Celery('devadmin')
# Using a string here means the worker doesn't have to serialize
# the configuration object to child processes.
# - namespace='CELERY' means all celery-related configuration keys
# should have a `CELERY_` prefix.
app.config_from_object('django.conf:settings', namespace='CELERY')
# Load task modules from all registered Django app configs.
app.autodiscover_tasks()
@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))
__init__.py
from __future__ import absolute_import, unicode_literals
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
__all__ = ('celery_app',)
视图.py
from celery import task, current_task
from celery.result import AsyncResult
from time import sleep
def checkconfig_post_new (request):
job = do_work.delay()
print ("Work started")
return HttpResponseRedirect ( reverse ( 'poll_state' ) + '?job=' + job.id )
任务.py
@task()
def do_work():
for i in range(10):
sleep(0.1)
print (i)
current_task.update_state(state='PROGRESS',
meta={'current': i, 'total': 10})
首先,检查您的工作人员的注册任务(启动2次,有时在第一次打印时清空(。
from celery.task.control import inspect
i = inspect()
i.registered_tasks()
第二,你需要把你的任务放在tasks.py
中,因为autodiscover_tasks
搜索这个模式。