如何为每个Django模型实例调度周期性的芹菜任务



我在数据库中有一堆Feed对象,我试图让每个Feed每小时更新一次。我的问题是,我需要确保没有任何重复的更新——它需要发生不超过每小时一次,但我也不希望feed等待两个小时更新。(如果每小时+/-几分钟发生一次还可以,但几分钟内发生两次就不好了。)

我使用Django和芹菜与Amazon SQS作为代理。我把feed更新代码设置为一个芹菜任务,但我没能找到一种方法来防止重复,同时保持与运行在多个节点上的芹菜的兼容性。

我目前的解决方案是向Feed模型添加一个last_update_scheduled属性,并每5分钟运行以下任务(伪代码):

threshold = datetime.now() - timedelta(seconds=3600)
for f in Feed.objects.filter(Q(last_update_scheduled__lt = threshold) |
                             Q(last_update_scheduled = None)):
    updateFeed.delay(f)
    f.last_update_scheduled = now
    f.save()

这容易受到许多同步问题的影响。例如,如果我的任务队列得到备份,则此任务可能同时运行两次,从而导致重复更新。我已经看到了一些解决方案(像芹菜的食谱和Stack Overflow的改编),但是memcached的解决方案是不可靠的,例如,重复可能发生在重启memcached时,或者如果它碰巧用完内存和清除旧数据。更不用说,为了一个简单的锁,我不愿意在我的生产配置中添加memcached。

在一个完美的世界里,我希望我能说:

@modelTask(Feed, run_every=3600)
def updateFeed(feed):
    # do something expensive

需要说明的是,芹菜食谱本身并没有使用memcached,而是使用了Django的缓存中间件。还有许多其他的缓存方法可以满足您的需求,而且没有memcached的缺点。

最新更新