我有一个Celery任务,比如:
from celery.task import task
from django.conf import settings
from base.tasks import BaseTask
@task(name="throw_exception", base=BaseTask)
def print_value(*args, **kwargs):
print('BROKER_URL:', settings.BROKER_URL)
我在我的虚拟机里运行一个Celery工作人员:
celery worker -A myproject -l info
工人展示:
Connected to amqp://guest:**@127.0.0.1:5672/myapp
当我从Django shell启动我的任务时:
>>> from django.conf import settings
>>> settings.BROKER_URL
'amqp://guest:**@127.0.0.1:5672/myapp'
>>> from myapp.tasks import print_value
>>> print_value.delay()
我从未在我的员工日志中看到执行的任务。
然而,如果我改为使用默认为"/"vhost的BROKER_URL,那么它会立即执行所有挂起的任务,这意味着即使设置了正确的BROKER_URL,我对print_value.delay()
的所有调用都会将其发送到错误的vhost。我做错了什么?
编辑:问题似乎是Celery没有一个一致的@task装饰器,并且通过使用错误的装饰器,您断开了任务与代理设置的连接。因此,本质上,我的所有任务都配置为使用默认代理,而不是在我的设置中定义的代理。老文档说要使用from celery.task import task
,但新文档。。。并没有真正指定,似乎暗示您应该使用在celery.py
文件中定义的app
实例,如@app.task
。这样做的问题是,我的所有任务都在单独的tasks.py
文件中,它们无法访问app
实例。如果我将一个任务复制到我的celery.py
中并使用@app.task
装饰器,那么它将使用正确的vhost并按预期工作,但很明显,这不是一个实用的修复方法,因为我必须将数十个函数复制到这个文件中。如何正确修复?
我现在使用Django+(Celery+RabbitMQ)也有同样的问题。我的解决方案是
CELERY_BROKER_URL=amqp://<user>:<password>@localhost:5672/<vhost>
以下是RabbitMQ.com>客户端文档>RabbitMQ URI规范
为了它的价值
Celery+RabbitMQ有很多功能。我在看带有rabbitmqctl list_vhosts
-的RabbitMQ,但我看不到我的vhost。WTF最后,我意识到我在本地开发服务器上配置supervisord
太早了。从CLI启动Celery会给出一堆反馈,supervisord
会将这些反馈放在不直接在我眼皮底下的地方,比如:
[2021-02-19 18:26:52,803: WARNING/MainProcess] (0, 0): (403) ACCESS_REFUSED - Login was refused using authentication mechanism AMQPLAIN. For details see the broker logfile.
AMQP
立刻让我想起了连接字符串。繁荣这是你的vhost
。
在仔细研究了Celery的代码后,我能找到的设置当前应用程序的唯一方法就是调用celery._state._set_current_app(app)
。很明显,这是一种内部方法,并不是为了以这种方式使用而设计的,但我找不到任何其他方法来明确地将我的自定义应用程序实例设置为"当前"应用程序。我本以为这应该自动完成,尤其是因为我的代码直接取自教程,所以要么文档不完整,要么这是一个错误。
在任何情况下,工作的芹菜文件看起来像:
from __future__ import absolute_import, print_function
import os
import sys
from celery import Celery
from celery._state import _set_current_app
import django
app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace='CELERY')
_set_current_app(app)
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings.settings')
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../myproject')))
django.setup()
from django.conf import settings
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
这导致我所有tasks.py文件中的所有@task
装饰器都能正确访问我的自定义Celery实例。
只需提供一个演示,使用经过测试的不同vhost的dj芹菜。
在项目文件夹中,__init__.py
:
from __future__ import absolute_import
# This will make sure the app is always imported when
# Django starts so that shared_task will use this app.
from .celery import app as celery_app
celery.py
,将SchoolMS
替换为您自己的项目标签:
from __future__ import absolute_import
import os
from celery import Celery, platforms
from django.conf import settings
# set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'SchoolMS.settings')
app = Celery('SchoolMS')
platforms.C_FORCE_ROOT = True
# Using a string here means the worker will not have to
# pickle the object when using Windows.
app.config_from_object('django.conf:settings')
app.autodiscover_tasks(lambda: settings.INSTALLED_APPS)
settings.py
:
BROKER_URL = 'amqp://schoolms:schoolms@localhost:5672/schoolms'
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERYBEAT_SCHEDULER = 'djcelery.schedulers.DatabaseScheduler'
CELERYBEAT_SCHEDULE = {
}
user/tasks.py
:
from celery import task
from django.conf import settings
@task
def send_tel_verify(tel_verify_id):
try:
tel_verify = TelVerify.objects.get(id=tel_verify_id)
try:
send_sms(tel_verify.tel, 'xxxx')
return ''success'
except SmsError as e:
return 'error'
except ObjectDoesNotExist:
return 'not found'
user/views.py
send_tel_verify.delay(tel_verify.id)