Python/rq-监视工作状态



如果这是一个愚蠢的问题,我道歉,我会羞愧地躲起来,但是:

我在Python中使用rq对作业进行排队。我希望它能这样工作:

  1. 作业A开始。作业A通过web API获取数据并存储数据
  2. 作业A正在运行
  3. 作业A完成
  4. 完成A后,作业B开始。作业B检查作业A存储的每个记录,并添加一些额外的响应数据
  5. 完成作业B后,用户会收到一封愉快的电子邮件,说他们的报告已经准备好了

到目前为止我的代码:

redis_conn = Redis()
use_connection(redis_conn)
q = Queue('normal', connection=redis_conn) # this is terrible, I know - fixing later
w = Worker(q)
job = q.enqueue(getlinksmod.lsGet, theURL,total,domainid)
w.work()

我认为我的最佳解决方案是有两个工人,一个负责工作A,一个用于工作B。工作B的工人可以监控工作A,当工作A完成时,开始工作B。

我想不出如何让一名员工监控另一名员工的状态来挽救我的生命。我可以用job.ID从作业A中获取作业ID。我可以用w.name获取工人姓名。但我不知道如何将这些信息传递给其他工人。

或者,有没有一种更简单的方法可以做到这一点,而我完全没有?

更新januari 2015,此拉取请求现在被合并,参数被重命名为depends_on,即:

second_job = q.enqueue(email_customer, depends_on=first_job)

原始帖子原封不动地留给运行旧版本的人,比如:

我已经提交了一个提取请求(https://github.com/nvie/rq/pull/207)以处理RQ中的作业依赖性。当这个拉取请求被合并时,您可以执行以下操作:

def generate_report():
    pass
def email_customer():
    pass
first_job = q.enqueue(generate_report)
second_job = q.enqueue(email_customer, after=first_job)
# In the second enqueue call, job is created,
# but only moved into queue after first_job finishes

现在,我建议编写一个包装器函数来按顺序运行您的作业。例如:

def generate_report():
     pass
def email_customer():
    pass
def generate_report_and_email():
    generate_report()
    email_customer() # You can also enqueue this function, if you really want to
# Somewhere else
q.enqueue(generate_report_and_email)

rq文档的这个页面上,看起来每个job对象都有一个result属性,可由job.result调用,您可以检查该属性。如果作业还没有完成,它将是None,但如果您确保您的作业返回一些值(甚至只是"Done"),那么您可以让您的其他工作人员检查第一个作业的结果,然后只有当job.result有值时才开始工作,这意味着第一个工作人员已经完成。

您可能对项目太深入而无法切换,但如果没有,请查看Twisted。http://twistedmatrix.com/trac/我现在正在将它用于一个访问API、抓取web内容等的项目。它并行运行多个作业,并按顺序组织某些作业,因此作业B在作业a完成之前不会执行。

如果你想尝试的话,这是学习Twisted的最佳教程。http://krondo.com/?page_id=1327

将作业A和作业B在一个函数中所做的事情组合起来,然后使用例如multiprocessing.Pool(它是map_async方法)将其分布在不同的进程中。

我不熟悉rq,但multiprocessing是标准库的一部分。默认情况下,它使用的进程数量与CPU的内核数量一样多,根据我的经验,这通常足以使机器饱和。

最新更新