我尝试在 Django 中为 Celery 配置三个队列/工作线程。
settings.py
CELERY_BROKER_URL = 'redis://localhost:6379'
CELERY_RESULT_BACKEND = 'redis://localhost:6379'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Europe/Berlin'
CELERY_QUEUES = (
Queue('manually_task', Exchange('manually_task'), routing_key='manually_task'),
Queue('periodically_task', Exchange('periodically_task'), routing_key='periodically_task'),
Queue('firsttime_task', Exchange('firsttime_task'), routing_key='firsttime_task'),
)
CELERY_ROUTES = {
'api.tasks.manually_task': {
'queue': 'manually_task',
'routing_key': 'manually_task',
},
'api.tasks.periodically_task': {
'queue': 'periodically_task',
'routing_key': 'periodically_task',
},
'api.tasks.firsttime_task': {
'queue': 'firsttime_task',
'routing_key': 'firsttime_task',
},
}
我有三个任务,每个任务都应该有自己的队列/工作线程。 我的任务如下所示:
@shared_task
def manually_task(website_id):
print("manually_task");
website = Website.objects.get(pk=website_id)
x = Proxy(website, "49152")
x.startproxy()
x = None
@periodic_task(run_every=(crontab(hour=19, minute=15)), ignore_result=True)
def periodically_task():
websites = Website.objects.all()
for website in websites:
x = Proxy(website, "49153")
x.startproxy()
x = None
@shared_task
def firsttime_task(website_id):
website = Website.objects.get(pk=website_id)
x = Proxy(website, "49154")
x.startproxy()
x = None
现在对于第一次试用,我只启动一个工人:
celery -A django-proj worker -Q manually_task -n manually_task
我的问题是任务显然没有执行,"manually_task"没有打印。 为什么它不起作用?
根据这些建议,我建议您在从manually_task.apply_async((webseite.pk,), queue='manually_task')
等视图中调用任务时提供队列名称,或者在启动工作线程时添加名为celery
的默认队列,如celery -A django-proj worker -Q manually_task,celery