如何确定一组Resque作业是否已完成



我有以下代码:

[23, 45, 69, 20].each do |page_id|
  Resque.enqueue(ProcessPage, page_id)
end

我想知道所有这些作业什么时候都完成了处理,以便通过电子邮件通知用户。我的第一次尝试是在作业的最后通知用户,但这只适用于每个作业。

我发现了这个宝石的resque状态,基本上让你这样做:

job_ids = []
[23, 45, 69, 20].each do |page_id|
  job_ids << ProcessPage.create(page_id)
end

然后你可以通过以下操作来检查作业的状态:

status = Resque::Plugins::Status::Hash.get(job_id)

让我困惑的是……我应该什么时候检查所有状态?我的意思是,我会检查一下所有工作的状态吗?那会让服务器超时,你知道我该怎么做吗?

我不打算在这里粘贴任何代码,因为看起来您非常熟悉应该如何编写作业,但请考虑以下想法:

您应该有一个resque作业,它获取要处理的id数组。此resque作业为每个id创建一个resque作业,以便并行处理它们。在为特定的id创建resque作业后,它将创建另一个resque作业,该作业将获取id列表,并定期检查它们是否完成。完成后,此作业将向用户发送一封电子邮件。

有了这个范式,你可以享受所有的世界:

  1. 每个ID都有自己的作业
  2. 并行执行
  3. 状态检查是在后台完成的,因此用户不会超时

更新:

用于检查resque作业的伪代码:

class CheckerJob
  @queue = :long
  def self.perform(ids)
    finished = []
    while finished.size < ids.size
      ids.each do |id|
        finished << id if job_finished?(id)
      end
      sleep 10
    end
    send_email_to_user
  end
end

现在您所要做的就是同时实现job_finished?(id)send_email_to_user方法。

简单的解决方案是让你的工作接受一个ID列表:

Class Job
  def self.perform(ids)
    threads = ids.inject([]) do |memo, id|
      memo << Thread.new { # whatever }
    end
    ThreadsWait.join_all(*threads)
    # All threads are done
    # Notify user that job is finished
  end
end

没有一项工作会检查这项工作是否已经完成。如果你打算走那种路线,订阅Redis活动http://redis.io/topics/internals-rediseventlib

UPDATE:添加了多线程

这样做怎么样?

Resque.enqueue_to("page_processes#{user.id}", ProcessPage, page_id)
#code in ProcessPageWorker below
def self.perform(page_id, user_id)
  #work
  # code below notifies user if the 'page_processes' queue is equal to 1
  notify_user if Resque.size("page_processes#{user_id}") == 1
end

我们将作业放入一个名为"page_processes"的队列中,再加上用户的ID。每个作业完成后,我们会检查"page_processes#{user_ID}"队列的大小。如果队列等于1,则意味着正在处理的作业是队列中的最后一个作业,因此我们应该通知用户。我们在队列的名称中包含user_id,以确保队列的名称是唯一的。

如果这有帮助,请告诉我。

最新更新