Akka Streams:在GraphStage Source中处理Future



我正在尝试构建一个Akka Stream,它通过调用FutureAPI来接收数据(API的本质是滚动,它递增地获取结果(。为了构建这样的Source,我正在使用GraphStage。

我修改了NumberSource示例,该示例一次只推送一个Int。我所做的唯一更改是将Int替换为getvalue(): Future[Int](以模拟API调用(:

class NumbersSource extends GraphStage[SourceShape[Int]] {
val out: Outlet[Int] = Outlet("NumbersSource")
override val shape: SourceShape[Int] = SourceShape(out)
// simple example of future API call
private def getvalue(): Future[Int] = Future.successful(Random.nextInt())

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
setHandler(out, new OutHandler {
override def onPull(): Unit = {
// Future API call
getvalue().onComplete{
case Success(value) =>
println("Pushing value received..") // this is currently being printed just once
push(out, counter)
case Failure(exception) =>
}
}
}
})
}
}
// Using the Source and Running the stream
val sourceGraph: Graph[SourceShape[Int], NotUsed] = new NumbersSource
val mySource: Source[Int, NotUsed] = Source.fromGraph(sourceGraph)
val done: Future[Done] = mySource.runForeach{
num => println(s"Received: $num") // This is currently not printed
}
done.onComplete(_ => system.terminate())

上面的代码不起作用。setHandler中的println语句只执行一次,不会向下游推送任何内容。

应如何处理此类Future呼叫?谢谢

更新

我尝试通过如下更改来使用getAsyncCallback:

class NumbersSource(futureNum: Future[Int]) extends GraphStage[SourceShape[Int]] {
val out: Outlet[Int] = Outlet("NumbersSource")
override val shape: SourceShape[Int] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
override def preStart(): Unit = {
val callback = getAsyncCallback[Int] { (_) =>
completeStage()
}
futureNum.foreach(callback.invoke)
}
setHandler(out, new OutHandler {
override def onPull(): Unit = {
val value: Int = ??? // How to get this value ??
push(out, value)
}
})
}
}
// Using the Source and Running the Stream
def random(): Future[Int] = Future.successful(Random.nextInt())
val sourceGraph: Graph[SourceShape[Int], NotUsed] = new NumbersSource(random())
val mySource: Source[Int, NotUsed] = Source.fromGraph(sourceGraph)
val done: Future[Done] = mySource.runForeach{
num => println(s"Received: $num") // This is currently not printed
}
done.onComplete(_ => system.terminate())

但是,现在我陷入了如何获取Future计算的价值的困境。在GraphStageFlow的情况下,我可以使用:

val value = grab(in) // where in is Inlet of a Flow

但是,我有一个GraphStageSource,所以我不知道如何获取上面计算的Future的Int值。

我不确定我是否理解正确,但如果你试图用Futures中计算的元素实现一个无限源,那么真的没有必要用你自己的GraphStage来实现。你可以简单地做如下:

Source.repeat(())
.mapAsync(parallelism) { _ => Future.successful(Random.nextInt()) }

Source.repeat(())只是一些任意值的无限源(在本例中为Unit类型,但您可以将()更改为您想要的任何值,因为这里忽略了它(。然后使用CCD_ 15将异步计算集成到流中。

我会加入另一个答案,以避免创建自己的graphstage。经过一些实验,这似乎对我有效:

type Data = Int
trait DbResponse {
// this is just a callback for a compact solution
def nextPage: Option[() => Future[DbResponse]] 
def data: List[Data]
}
def createSource(dbCall: DbResponse): Source[Data, NotUsed] = {
val thisPageSource = Source.apply(dbCall.data)
val nextPageSource = dbCall.nextPage match {
case Some(dbCallBack) => Source.lazySource(() => Source.future(dbCallBack()).flatMapConcat(createSource))
case None             => Source.empty
}
thisPageSource.concat(nextPageSource)
}
val dataSource: Source[Data, NotUsed] = Source
.future(???: Future[DbResponse]) // the first db call
.flatMapConcat(createSource)

我试过了,它几乎完美地工作了,我不知道为什么,但第二页是即时请求的,但其余的会按预期工作(有背压和其他什么(。

最新更新