我将 Django 与 Celery 和 Redis 一起使用来处理异步任务。 我定义了三个任务,它们应该在您自己的队列中运行。
我的项目结构如下所示:
django-project
|- api
|- task.py
|- view.py
|- django-project
|- settings.py
|- celery.py
|- __init__.py
我在 api 应用中的 task.py 中定义的任务:
@shared_task
def manually_task(website_id):
print("manually_task");
website = Website.objects.get(pk=website_id)
x = Proxy(website, "49152")
x.startproxy()
x = None
@periodic_task(run_every=(crontab(hour=19, minute=15)), ignore_result=True)
def periodically_task():
websites = Website.objects.all()
for website in websites:
x = Proxy(website, "49153")
x.startproxy()
x = None
@shared_task
def firsttime_task(website_id):
website = Website.objects.get(pk=website_id)
x = Proxy(website, "49154")
x.startproxy()
x = None
现在这是我的初始化.py
__all__ = ('celery_app',)
以及 settings.py 中的芹菜设置:
CELERY_BROKER_URL = 'redis://localhost:6379'
CELERY_RESULT_BACKEND = 'redis://localhost:6379'
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Europe/Berlin'
CELERY_DEFAULT_QUEUE = 'red'
CELERY_TASK_QUEUES = (
Queue('red', Exchange('red'), routing_key='red'),
)
CELERY_ROUTES = {
'api.tasks.manually_task': {'queue': 'red'},
}
我的 celery.py 如下所示:
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'django-project.settings')
app = Celery('django-project')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
这是我的设置。现在我开始所有需要的东西(自己终端中的每一行(:
redis-server
celery -A django-project worker -Q red
python3 manage.py runserver 0.0.0.0:8000
一切开始都没有问题。在视图中,我像这样调用任务:manually_task.delay(webseite.pk)
但在工人身上什么都不做。 如果我在没有CELERY_TASK_QUEUES
的情况下尝试此操作,settings.py
中CELERY_DEFAULT_QUEUE
和CELERY_ROUTES
设置,并celery -A django-project worker
正常启动工人,它工作正常。 我做错了什么?
manually_task.delay(webseite.pk)
会将任务发送到默认队列。由于您的工作线程订阅了red
队列,因此我假设没有工作人员订阅了默认队列,因此任务不会执行。
请尝试以下操作:manually_task.apply_async(webseite.pk, queue="red")