我有两个Django Celery代码实例在两个不同的服务器上运行,以便冗余访问另一台服务器上的公共数据库。我注意到当用户提交作业时,芹菜在两台服务器上同时启动相同的任务。这将创建争用条件并更新数据库两次。如何通过将任务保存在一台服务器中而另一个类似的任务在另一台服务器中启动来防止这种情况?
您需要创建一个锁以防止两个任务同时执行,芹菜文档中有一个页面 http://ask.github.io/celery/cookbook/tasks.html 其中包含如何执行此操作的示例。 小心你的实现不会卡在某种死锁中,并且你在锁上设置了超时,这样如果工作线程崩溃,它不会无限期地保持锁。
# Example from the link above
from celery.task import Task
from django.core.cache import cache
from django.utils.hashcompat import md5_constructor as md5
from djangofeeds.models import Feed
LOCK_EXPIRE = 60 * 5 # Lock expires in 5 minutes
class FeedImporter(Task):
name = "feed.import"
def run(self, feed_url, **kwargs):
logger = self.get_logger(**kwargs)
# The cache key consists of the task name and the MD5 digest
# of the feed URL.
feed_url_digest = md5(feed_url).hexdigest()
lock_id = "%s-lock-%s" % (self.name, feed_url_hexdigest)
# cache.add fails if if the key already exists
acquire_lock = lambda: cache.add(lock_id, "true", LOCK_EXPIRE)
# memcache delete is very slow, but we have to use it to take
# advantage of using add() for atomic locking
release_lock = lambda: cache.delete(lock_id)
logger.debug("Importing feed: %s" % feed_url)
if acquire_lock():
try:
feed = Feed.objects.import_feed(feed_url)
finally:
release_lock()
return feed.url
logger.debug(
"Feed %s is already being imported by another worker" % (
feed_url))
return
此模式需要用于获取锁的缓存服务器。 当任务启动时,它将根据键(例如"my_task"(在缓存服务器中获取一个锁,然后在任务完成时,它将释放该锁。 启动的任何其他任务可能都应该有一个 while
循环,该循环等待,直到它可以获取锁。 Redis 锁是原子的,这意味着获取锁的操作不会同时发生,只有一个任务能够获取锁。