我正在尝试使用Scala Futures产生许多CPU密集作业。因为有很多东西,所以我需要节省这些作业(线程)的创建。为此我使用:
import java.util.concurrent.ArrayBlockingQueue
import scala.concurrent._
val numThread = sys.runtime.availableProcessors
import java.util.concurrent.ExecutorService
import java.util.concurrent.ArrayBlockingQueue
implicit val context = ExecutionContext.fromExecutorService(
new ThreadPoolExecutor(
numThread, numThread,
0L, TimeUnit.SECONDS,
new ArrayBlockingQueue[ Runnable ]( numThread ) {
override def offer( e: Runnable ) = {
put( e ); // Waiting for empty room
true
}
})
)
要测试这个我创建了2个非常简单的功能:
import scala.util.{ Try, Success, Failure }
import scala.util.Random
def longComputation() = {
val id = Thread.currentThread().getId
//blocking {
println( s"Started thread: $id" )
Thread.sleep( 500 )
println( s"Finished thread: $id" )
//}
id
}
def processResult[T](r : Try[T]) = {
blocking {
r match {
case Success( id ) => println( s"Thread result: $id" )
case Failure( t ) => println( "An error has occured: " + t.getMessage )
}
}
}
然后,我执行测试以通过多线程执行任务:def main( args: Array[ String ] ) {
val s = Stream.from( 0 )
//s.foreach { x => println(x) ; val f = Future( longComputation ) ; f.onComplete{ processResult } }
s.foreach { x =>
println(x)
val f = Future( longComputation )
val p = Promise[Long]()
p completeWith f
p.future.onComplete{ processResult }
}
println("Finished")
context.shutdown
}
当我执行此操作时,启动了16个线程(CPU计数为8)。该程序打印了"完成"信息。然后系统锁定,没有其他执行。但是,如果我删除回调,则该线程将按预期执行 ad infinitum 。
上面我已经尝试了blocking
并使用Promise
。行为没有变化。因此,我的问题是:如何在不阻止回调的情况下踩下任务执行?如果不可能,在线程(未来)中进行I/O可行吗?
感谢任何指针。
该程序在僵局中运行。提供的threadPool
的大小为固定尺寸,因此发生以下情况: Future(longComputation)
从线程池分配线程并开始工作。完成后,onComplete
从池中分配Thread
来执行提供的功能。
鉴于工作要比完成工作要花费的时间更长,在某个时候,所有线程都在忙于工作。他们中的任何一个都完成,并且onComplete
也需要一个线程,因此它要求执行程序为一个。工作无法完成,因为所有线程都很忙,并且机器在僵局中停止。
我们可以通过向消费者提供保留资源来解决这一生产者消费者的僵局。这样,工作被固定尺寸的线程池促进了工作,但是我们确保任何完成的工作都可以进一步处理。
这个片段,我将context
重命名为fixedContext
,显示了使用单独的上下文处理结果,解决僵局的使用。我也摆脱了Promise
,除了代表未来之外,它没有发挥真正的功能。
val fixedContext = // same as in question
val singleThreadContext = ExecutionContext.fromExecutorService(Executors.newFixedThreadPool(1))
...
...
def main( args: Array[ String ] ) {
val s = Stream.from( 0 )
s.foreach { x =>
println(x)
val f = Future( longComputation )(fixedContext)
f.onComplete{ processResult }(singleThreadContext)
}
println("Finished")
fixedContext.shutdown
}
}
线程完成longComputation
时,它试图将作业放在队列上以执行回调并被阻止,因为队列已满。因此,最终,第一个"批次"工作完成了,但是所有线程仍然很忙,等待队列安排回调,没有什么可用的排队。
解决方案?从队列中删除极限。这样,试图提交回调的线程就不会被阻止,并且将可用来掌握下一个任务。
您可能希望将某些东西插入生产者循环中,以使其慢一点,以使无限的队列不会吞噬所有记忆。 Semaphore
也许?
val sem = new Semaphore(numThread*2)
def processResult[T](r : Try[T]) = blocking {
r match {
case Success( id ) => println( s"Thread result: $id" )
case Failure( t ) => println( "An error has occured: " + t.getMessage )
}
sem.release
}
Stream.from(0).foreach { _ =>
sem.acquire
new Future(longComputation).onComplete(processResult)
}
您不需要自定义执行上下文 - Scala的默认值实际上可以更好地工作