我如何管理ruby线程以便它们完成所有工作



我有一个可以划分为独立单元的计算,我现在处理它的方式是创建固定数量的线程,然后在每个线程中分配要完成的工作块。所以在伪代码中,它看起来像

# main thread
work_units.take(10).each {|work_unit| spawn_thread_for work_unit}
def spawn_thread_for(work)
  Thread.new do
    do_some work
    more_work = work_units.pop
    spawn_thread_for more_work unless more_work.nil?
  end
end

基本上,一旦创建了初始数量的线程,每个线程都会做一些工作,然后不断从工作堆栈中获取要做的东西,直到什么都没有留下。当我在irb中运行时,一切都很好,但当我使用解释器执行脚本时,事情就不那么好了。我不知道如何让主线程等待所有工作完成。有没有一种很好的方法可以做到这一点,或者我一直在主线程中执行sleep 10 until work_units.empty?

在ruby 1.9(和2.0)中,您可以使用stdlib中的ThreadsWait来实现此目的:

require 'thread'
require 'thwait'
threads = []
threads << Thread.new { }
threads << Thread.new { }
ThreadsWait.all_waits(*threads)

如果修改spawn_thread_for以保存对创建的Thread的引用,则可以在线程上调用Thread#join以等待完成:

x = Thread.new { sleep 0.1; print "x"; print "y"; print "z" }
a = Thread.new { print "a"; print "b"; sleep 0.2; print "c" }
x.join # Let the threads finish before
a.join # main thread exits...

生产:

abxyzc

(从ri Thread.new文档中窃取。有关更多详细信息,请参阅ri Thread.join文档。)

因此,如果您修改spawn_thread_for以保存线程引用,则可以加入所有引用:

(未经测试,但应该有味道)

# main thread
work_units = Queue.new # and fill the queue...
threads = []
10.downto(1) do
  threads << Thread.new do
    loop do
      w = work_units.pop
      Thread::exit() if w.nil?
      do_some_work(w)
    end
  end
end
# main thread continues while work threads devour work
threads.each(&:join)
Thread.list.each{ |t| t.join unless t == Thread.current }

您似乎正在复制Parallel Each(Peach)库提供的内容。

您可以使用线程#加入

加入(p1=v1)公共

调用线程将暂停执行并运行thr。直到thr退出或超过限制秒后才返回。如果超过时间限制,则返回nil,否则返回thr。

此外,您还可以使用Enumerable#each_slice来批量迭代中的工作单元

work_units.each_slice(10) do |batch|
  # handle each work unit in a thread
  threads = batch.map do |work_unit|
    spawn_thread_for work_unit
  end
  # wait until current batch work units finish before handling the next batch
  threads.each(&:join)
end

相关内容

  • 没有找到相关文章

最新更新