我正在尝试将Django Signals的postrongave功能与芹菜任务相结合。在一个新的Message对象被保存到数据库后,我想评估实例是否有两个属性之一,如果有,调用'send_sms_function',这是一个芹菜注册的任务。
tasks.py
from my_project.celery import app
@app.task
def send_sms_message(message):
# Do something
signals.py
from django.db.models.signals import post_save
from django.dispatch import receiver
import rollbar
rollbar.init('234...0932', 'production')
from dispatch.models import Message
from comm.tasks import send_sms_message
@receiver(post_save, sender=Message)
def send_outgoing_messages(sender, instance, **kwargs):
if instance.some_attribute == 'A' or instance.some_attribute == 'B':
try:
send_sms_message.delay(instance)
except:
rollbar.report_exc_info()
else:
pass
我通过运行一个Celery worker在本地测试这个。当我在Django shell中调用芹菜函数时,它像预期的那样工作。但是,当我将Message实例保存到数据库时,该函数确实不按预期工作:没有任何内容发布到任务队列,并且我没有看到任何错误消息。
我做错了什么?
这看起来像是序列化和/或设置的问题。当芹菜将消息传递给代理时,它需要有数据的某种表示形式。芹菜序列化你给任务的参数,但如果你没有将它与你传递的参数配置一致(例如,你的代理期望JSON,但你向它发送了一个pickle python对象),任务可能会失败,因为工作器无法轻松解码你发送的内容。如果你在shell中运行这个函数(没有调用delay),它将被同步调用,因此没有序列化或消息传递。
在你的设置中,你应该使用JSON序列化(除非你有一个很好的理由),但如果没有,那么你的酸洗可能有问题。当您运行芹菜时,您总是可以增加日志级别以调试,以查看有关序列化相关错误的更多信息:
celery -A yourapp worker -l debug
当有疑问时,使用print语句/函数来确保信号接收器正在运行。如果没有,你可以创建一个AppConfig
类,在它的ready
方法中导入你的接收器,或者其他一些合理的技术来确保你的接收器正在被注册。
[意见]我建议这样做:
@receiver(post_save, sender=Message)
def send_outgoing_messages(sender, instance, **kwargs):
enqueue_message.delay(instance.id)
in yourmodule/tasks.py
@app.task
def enqueue_message(message_id):
msg = Message.object.get(id=message_id)
if msg.some_attribute in ('A', 'B'): # slick or
send_sms_message.delay(message_id)
你总是可以使用芹菜的组合技术,但在这里你有一些不会增加你的请求/响应周期的复杂性。[/意见]
表达式if instance.some_attribute == 'A' or 'B'
可能是您的问题。
你的意思可能是:
if instance.some_attribute == 'A' or instance.some_attribute == 'B'
或者我怎么写:
if instance.some_attribute in ('A', 'B')
您正在同步调用函数而不是排队:
send_sms_message.delay(instance)
应该将消息排队
http://celery.readthedocs.org/en/latest/reference/celery.app.task.html celery.app.task.Task.delay
http://celery.readthedocs.org/en/latest/userguide/calling.html基础知识
@dgel还指出了一个逻辑错误