我正在尝试构建一个Akka Stream源,它通过调用Future
API来接收数据(API的本质是滚动,它递增地获取结果(。为了构建这样的Source,我正在使用GraphStage。
我修改了NumberSource示例,该示例一次只推送一个Int
。我所做的唯一更改是将Int
替换为getvalue(): Future[Int]
(以模拟API调用(:
class NumbersSource extends GraphStage[SourceShape[Int]] {
val out: Outlet[Int] = Outlet("NumbersSource")
override val shape: SourceShape[Int] = SourceShape(out)
// simple example of future API call
private def getvalue(): Future[Int] = Future.successful(Random.nextInt())
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
setHandler(out, new OutHandler {
override def onPull(): Unit = {
// Future API call
getvalue().onComplete{
case Success(value) =>
println("Pushing value received..") // this is currently being printed just once
push(out, counter)
case Failure(exception) =>
}
}
}
})
}
}
// Using the Source and Running the stream
val sourceGraph: Graph[SourceShape[Int], NotUsed] = new NumbersSource
val mySource: Source[Int, NotUsed] = Source.fromGraph(sourceGraph)
val done: Future[Done] = mySource.runForeach{
num => println(s"Received: $num") // This is currently not printed
}
done.onComplete(_ => system.terminate())
上面的代码不起作用。setHandler
中的println语句只执行一次,不会向下游推送任何内容。
应如何处理此类Future呼叫?谢谢
更新
我尝试通过如下更改来使用getAsyncCallback:
class NumbersSource(futureNum: Future[Int]) extends GraphStage[SourceShape[Int]] {
val out: Outlet[Int] = Outlet("NumbersSource")
override val shape: SourceShape[Int] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
override def preStart(): Unit = {
val callback = getAsyncCallback[Int] { (_) =>
completeStage()
}
futureNum.foreach(callback.invoke)
}
setHandler(out, new OutHandler {
override def onPull(): Unit = {
val value: Int = ??? // How to get this value ??
push(out, value)
}
})
}
}
// Using the Source and Running the Stream
def random(): Future[Int] = Future.successful(Random.nextInt())
val sourceGraph: Graph[SourceShape[Int], NotUsed] = new NumbersSource(random())
val mySource: Source[Int, NotUsed] = Source.fromGraph(sourceGraph)
val done: Future[Done] = mySource.runForeach{
num => println(s"Received: $num") // This is currently not printed
}
done.onComplete(_ => system.terminate())
但是,现在我陷入了如何获取Future计算的价值的困境。在GraphStage
、Flow
的情况下,我可以使用:
val value = grab(in) // where in is Inlet of a Flow
但是,我有一个GraphStage
,Source
,所以我不知道如何获取上面计算的Future的Int值。
我不确定我是否理解正确,但如果你试图用Future
s中计算的元素实现一个无限源,那么真的没有必要用你自己的GraphStage
来实现。你可以简单地做如下:
Source.repeat(())
.mapAsync(parallelism) { _ => Future.successful(Random.nextInt()) }
Source.repeat(())
只是一些任意值的无限源(在本例中为Unit
类型,但您可以将()
更改为您想要的任何值,因为这里忽略了它(。然后使用CCD_ 15将异步计算集成到流中。
我会加入另一个答案,以避免创建自己的graphstage。经过一些实验,这似乎对我有效:
type Data = Int
trait DbResponse {
// this is just a callback for a compact solution
def nextPage: Option[() => Future[DbResponse]]
def data: List[Data]
}
def createSource(dbCall: DbResponse): Source[Data, NotUsed] = {
val thisPageSource = Source.apply(dbCall.data)
val nextPageSource = dbCall.nextPage match {
case Some(dbCallBack) => Source.lazySource(() => Source.future(dbCallBack()).flatMapConcat(createSource))
case None => Source.empty
}
thisPageSource.concat(nextPageSource)
}
val dataSource: Source[Data, NotUsed] = Source
.future(???: Future[DbResponse]) // the first db call
.flatMapConcat(createSource)
我试过了,它几乎完美地工作了,我不知道为什么,但第二页是即时请求的,但其余的会按预期工作(有背压和其他什么(。