我试图使用Celery定期查询外部api,并用新数据更新Django项目中的数据库。Celery正确地安排任务并将其发送给芹菜工作者,但它从不执行任何操作。
这是我的celery.py文件,它与我的settings.py:处于同一级别
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from django.conf import settings
from celery.schedules import crontab
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "data_analysis.settings")
app = Celery("data_analysis")
app.conf.enable_utc = False
app.conf.update(timezone= 'Asia/Kolkata')
app.config_from_object(settings, namespace="CELERY")
app.autodiscover_tasks()
app.conf.beat_schedule = {
'update_all_api':{
'task':'operations.tasks.celery_update',
'schedule': 30.0,
}
}
@app.task(bind=True)
def debug_task(self):
print(f'Request: {self.request!r}')
这是我的任务.py,它在一个名为"操作"的应用程序中:
from celery import shared_task
import time
from operations.models import Machine, MachineData
from operations.analytics.processing import get_data
@shared_task(bind=True)
def celery_update(self):
print('Running Update')
print('The time is :' + str(time.asctime(time.localtime(time.time()))))
print('Getting Data ...')
all_machines = Machine.objects.all()
for machine in all_machines:
data_list = get_data(machine.api_url)
for data in data_list:
# print(data)
if not MachineData.objects.filter(data=data):
print('New Data received for :' + str(machine))
MachineData.objects.create(machine=machine, data=data)
else:
print('No new data for : ' + str(machine))
return 'done'
调度程序将任务发送给芹菜工作程序,但它从不执行该函数。
这是设置、目录结构和终端的屏幕截图。设置
如果有人能指出我的错误,那将是非常有帮助的。
除了运行芹菜节拍来执行生产任务外,您还应该运行一个芹菜工人,如下所示:
celery -A data_analysis worker -l info
芹菜节拍产生任务,然后,芹菜工人将执行任务。