在带有Source.queue和Sink.queue的akka流媒体程序中,我提供了1000个项目,但是当我试图将它们取出时,它只是挂起



我试图了解如何在Akka流媒体中使用Source.queue和Sink.queue。 在我下面写的小测试程序中,我发现我能够成功地向Source.queue提供1000个项目。 但是,当我等待未来应该给我从队列中拉出所有这些项目的结果时,我的 未来永远不会完成。 具体来说,我们应该在最后看到的消息"打印我们从队列中拉出的内容" 永远不会打印出来 - 相反,我们看到错误"超时异常:期货在 [10 秒]后超时">

任何指导都非常感谢!

import akka.actor.ActorSystem
import akka.event.{Logging, LoggingAdapter}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.stream.{ActorMaterializer, Attributes}
import org.scalatest.FunSuite
import scala.collection.immutable
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}
class StreamSpec extends FunSuite {
implicit val actorSystem: ActorSystem = ActorSystem()
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val log: LoggingAdapter = Logging(actorSystem.eventStream, "basis-test")
implicit val ec: ExecutionContext = actorSystem.dispatcher
case class Req(name: String)
case class Response(
httpVersion: String = "",
method: String = "",
url: String = "",
headers: Map[String, String] = Map())

test("put items on queue then take them off") {
val source = Source.queue[String](128, akka.stream.OverflowStrategy.backpressure)
val flow = Flow[String].map(element => s"Modified $element")
val sink = Sink.queue[String]().withAttributes( Attributes.inputBuffer(128, 128))
val (sourceQueue, sinkQueue) = source.via(flow).toMat(sink)(Keep.both).run()
(1 to 1000).map( i =>
Future {
println("offerd" + i)             // I see this print 1000 times as expected
sourceQueue.offer(s"batch-$i")
}
)
println("DONE OFFER FUTURE FIRING")
// Now use the Sink.queue to pull the items we added onto the Source.queue
val seqOfFutures: immutable.Seq[Future[Option[String]]] = 
(1 to 1000).map{ i => sinkQueue.pull() }
val futureOfSeq: Future[immutable.Seq[Option[String]]] = 
Future.sequence(seqOfFutures)
val seq: immutable.Seq[Option[String]] = 
Await.result(futureOfSeq, 10.second)
// unfortunately our future times out here
println("print what we pulled off the queue:" + seq);
}
}

再看一遍,我意识到我最初设置和提出我的问题不正确。

伴随我原始问题的测试掀起了一波浪潮 的 1000 个期货,每个期货都试图向队列提供 1 个项目。 然后,该测试尝试的第二步创建一个 1000 个元素的序列 (seqOfFutures( 每个未来都试图从队列中提取一个值。 关于为什么我得到超时错误的理论是,由于运行而存在某种死锁 线程不足或由于一个线程在另一个线程上等待但等待的线程被阻塞, 或类似的东西。

在这一点上,我对寻找确切原因不感兴趣,因为我已经纠正了 下面的代码中的内容(请参阅更正的代码(。

在新代码中,调用使用队列的测试: "将项目放在队列中,然后将它们移除(使用异步并行性(- (3("。

在这个测试中,我有一组 10 个任务并行运行以执行"enequeue"操作。 然后我还有另外 10 个任务执行取消排队操作,这不仅涉及 列表中的项目,但也调用 stringModifyFunc,这引入了 1 毫秒的处理延迟。

我还想证明我从 并行启动任务,并通过将任务步骤的结果传递到 队列,所以测试 3 作为定时操作运行,我发现它需要 1.9 秒。

测试 (1( 和 (2( 执行相同的工作量,但按顺序进行 -- 第一个没有干预队列,第二个没有干预队列 使用队列在步骤之间传递结果。这些测试分别在 13.6 秒和 15.6 秒内运行 (这表明队列增加了一些开销,但这被并行运行任务的效率所掩盖。

更正的代码

import akka.{Done, NotUsed}
import akka.actor.ActorSystem
import akka.event.{Logging, LoggingAdapter}
import akka.stream.scaladsl.{Flow, Keep, Sink, Source}
import akka.stream.{ActorMaterializer, Attributes, QueueOfferResult}
import org.scalatest.FunSuite
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future}
class Speco extends FunSuite {
implicit val actorSystem: ActorSystem = ActorSystem()
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val log: LoggingAdapter = Logging(actorSystem.eventStream, "basis-test")
implicit val ec: ExecutionContext = actorSystem.dispatcher
val stringModifyFunc: String => String = element => {
Thread.sleep(1)
s"Modified $element"
}
def setup = {
val source = Source.queue[String](128, akka.stream.OverflowStrategy.backpressure)
val sink = Sink.queue[String]().withAttributes(Attributes.inputBuffer(128, 128))
val (sourceQueue, sinkQueue) = source.toMat(sink)(Keep.both).run()
val offers: Source[String, NotUsed] = Source(
(1 to iterations).map { i =>
s"item-$i"
}
)
(sourceQueue,sinkQueue,offers)
}
val outer = 10
val inner = 1000
val iterations = outer * inner
def timedOperation[T](block : => T) = {
val t0 = System.nanoTime()
val result: T = block    // call-by-name
val t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0) / (1000 * 1000) + " milliseconds")
result
}
test("20k iterations in single threaded loop no queue (1)") {
timedOperation{
(1 to iterations).foreach { i =>
val str = stringModifyFunc(s"tag-${i.toString}")
System.out.println("str:" + str);
}
}
}
test("20k iterations in single threaded loop with queue (2)") {
timedOperation{
val (sourceQueue, sinkQueue, offers) = setup
val resultFuture: Future[Done] = offers.runForeach{ str =>
val itemFuture = for {
_ <- sourceQueue.offer(str)
item <- sinkQueue.pull()
} yield (stringModifyFunc(item.getOrElse("failed")) )
val item = Await.result(itemFuture, 10.second)
System.out.println("item:" + item);
}
val  result = Await.result(resultFuture, 20.second)
System.out.println("result:" + result);
}
}
test("put items on queue then take them off (with async parallelism) - (3)") {
timedOperation{
val (sourceQueue, sinkQueue, offers) = setup
def enqueue(str: String) = sourceQueue.offer(str)
def dequeue = {
sinkQueue.pull().map{
maybeStr =>
val str =    stringModifyFunc( maybeStr.getOrElse("failed2"))
println(s"dequeud value is $str")
}
}
val offerResults: Source[QueueOfferResult, NotUsed] =
offers.mapAsyncUnordered(10){ string => enqueue(string)}
val dequeueResults: Source[Unit, NotUsed] = offerResults.mapAsyncUnordered(10){ _ =>  dequeue }
val runAll: Future[Done] = dequeueResults.runForeach(u => u)
Await.result(runAll, 20.second)
}
}
}

最新更新