芹菜排队等待其他网站上的动作



嘿,我有一个问题,需要一些建议。我做了一些研究,但没有发现任何东西,这说明了我的问题。

我想开发的流程:

  1. 许多用户可以触发动作,这将一些条目添加到我的队列(RabbitMQ)
  2. 队列任务应该获取登录和密码(从数据库[可以是许多帐户])为一个网站,并执行一些操作。这些操作返回某种结果,并与使用的帐户一起保存在数据库中。
  3. 另一个定时任务(每5分钟)应该获取保存的数据库条目并再次登录网站并执行一些其他操作。

问题:每个登录名/密码组合只能同时登录。如果一个组合在同一时间被使用了不止一次,那么任务将会互相注销。

我一直以来的想法:

  1. 为每个登录/密码组合创建自己的队列,并为其创建一个worker,该worker一次只处理一个条目。
  2. 锁定数据库中的行
对于这个问题你有什么建议或解决办法吗?

我使用的框架是:Django作为主要的应用程序框架,芹菜+RabbitMQ作为我的队列系统。

Edit1:
http://docs.celeryproject.org/en/latest/tutorials/task-cookbook.html ensuring-a-task-is-only-executed-one-at-a-time
也许这篇文章能帮到我。也许设置一个缓存键会有所帮助。但是,如果所有的登录/密码组合都被"阻塞",则任务将遇到一些死锁问题。

使用某种锁。

如果你能容忍偶尔的故障,那么你可以使用memcached或redis之类的东西来做一个快速而肮脏的分布式锁定。或者,如你所说,你的数据库。

我不知道我是否会在数据库中锁定行-我个人会使用Redis,只是创建一个映射

www.whatever.com:username:hash(password) -> worker hostname:worker pid:start time

,然后你的工作人员可以检查一个特定的网站/用户名/密码组合是否被锁定。

使用flower,你还可以检查已经锁定了一组特定凭据的工作线程是否仍然处于活动状态。

将任务封装在try/except/finally块中,以确保在每个任务结束时释放锁,即使它失败

import redis
r = redis.StrictRedis()
class MyTask(Task):
    __call__(site, username, password):
       key = '%s:%s:%s' % (site, username, hash(password) 
       val = '%s:%s:%s' % (datetime.now().isoformat(), self.request.hostname, os.getpid())
       if not setnx(key, val):
           LOG.error('%s locked', key)
           return
        try:
            return super(MyTask, self).__call__(site, username, password)
        finally:
            r.del(key)

相关内容

  • 没有找到相关文章

最新更新