这是一个后续问题。
Django 1.3.1,芹菜2.2.7,蟒蛇2.6。
我在fruits/models.py
中有以下内容:
考虑模型:
class Fruit(models.Model):
name = models.CharField(max_length=50)
def __unicode__(self):
return self.name
以及fruits/tasks.py
中的以下内容:
from django.dispatch import receiver
from django.db.models import signals
from celery.task import periodic_task, task
import fruits.models as m
import time
@task()
def check_fruit(id):
time.sleep(2)
try:
fruit = m.Fruit.objects.get(pk=id)
print "Fruit %s is found!" % fruit.name
except m.Fruit.DoesNotExist:
print "no such fruit"
@receiver(signals.pre_save, sender=m.Fruit, dispatch_uid="on_fruit_save")
def on_custom_feed_save(sender, instance, **kwargs):
check_fruit.apply_async(args=[instance.id])
我启动芹菜守护程序,然后打开django-shell并键入:
import fruits.tasks;
import fruits.models as m;
m.Fruit(name="plum").save()
问题:我本以为任务会找到结果,但从来没有。为什么?
(我故意从预保存信号启动任务,以模拟在大型系统上发生的问题)。
老问题其实很简单,问题不是比赛条件或bug等。
该示例从头开始创建新对象,并且在pre_save信号期间该对象尚未存储在数据库中。因此,没有设置"instance.id",对于新创建的对象,它是None。
check_fruit.delay(None)
由pre_save信号调用并产生
fruit = m.Fruit.objects.get(pk=None)
DB中没有任何主键为null的行。
Celery任务实际上是在检查这个查询中是否有新创建的对象。查询总是按预期抛出异常。
使用postrongave信号,并为新对象检查创建的参数。
使用postsave后,可能会出现与事务相关的race条件。大多数时候你不会在意,任务重试大多有效。但是,如果您关心竞争条件,请查看新的django功能,on_commit hook
本周我自己也遇到了一个非常类似的问题,只是我在没有Django的情况下使用了Celery。我发现sender参数只有在任务的实例被传递给它时才起作用,而不仅仅是对sender类本身的引用。
我对这个问题进行了一些研究,发现sender参数被芹菜用来比较特定任务的id和信号的注册发件人(设置为过滤特定发件人)
在celener/utils/dispatch/signals.py模块中,以下执行此评估:
def _make_id(target): # pragma: no cover
if hasattr(target, 'im_func'):
return (id(target.im_self), id(target.im_func))
return id(target)
首先获取某个目标对象的id。当在装饰器中指定发送方时,它被保存在类似于以下的元组中:
lookup_key = (_make_id(receiver), _make_id(sender))
稍后,当任务激发时,会调用_live_receivers
方法,该方法本质上执行评估,以查看lookup_key中指定的目标发送方是否与当前发送方(激发任务)的id匹配:
def _live_receivers(self, senderkey):
"""Filter sequence of receivers to get resolved, live receivers.
This checks for weak references and resolves them, then returning only
live receivers.
"""
none_senderkey = _make_id(None)
receivers = []
for (receiverkey, r_senderkey), receiver in self.receivers:
if r_senderkey == none_senderkey or r_senderkey == senderkey:
if isinstance(receiver, WEAKREF_TYPES):
# Dereference the weak reference.
receiver = receiver()
if receiver is not None:
receivers.append(receiver)
else:
receivers.append(receiver)
return receivers
现在我遇到的问题是,即使我指定了要接收信号的任务,但当它被触发时,发送方密钥始终不匹配。
我能正确工作的唯一方法是在我为特定任务本身构建的抽象任务类的__init__
方法中注册信号。通过这样做,我能够向Celery传递它为任务注册的确切实例(self
)。
我不知道这个问题是与我的逻辑或对信号如何工作的理解有关,还是与Celery中的错误有关,但我知道通过任务实例解决了这个问题,此后一切都很好。
需要注意的一些事项:
- 我没有使用Django,因此也没有使用Dyango-Celery扩展
- 我不确定问题出在Celery身上(更有可能是我在逻辑上犯了错误或误解了什么)
- 我不知道这种情况是否适用于您,因为我不确定Django Celery是如何改变Celery后端的工作方式的
尽管如此,我希望这是有益的。
祝你好运!
老问题现在可能已经解决了,但对于未来的访客来说。。。
听起来芹菜工作程序打开了一个长时间运行的事务,如果它从未提交,它就永远看不到新对象。在任务中运行数据库查询之前,请尝试添加此项:
from django.db import transaction
transaction.commit()
请注意,Django 1.6对事务管理进行了一些更改,但截至本文撰写之时,它还没有发布。