如何运行Monix的parSequenceUnordered,并处理每个任务的结果



我目前正在实现对API的客户端http请求,并决定探索sttp&此任务的monix。由于我是Monix的新手,我仍然不确定如何运行任务并检索它们的结果。我的目标是获得一系列http请求结果,我可以并行调用这些结果->解析->负载

以下是我迄今为止尝试的一个片段:

import sttp.client._
import sttp.client.asynchttpclient.monix._
import monix.eval.Task
object SO extends App {
val postTask = AsyncHttpClientMonixBackend().flatMap { implicit backend =>
val r1 = basicRequest.get(uri"https://hello.world.io/v1/bla")
.header("accept", "application/json")
.response(asString)
.body()
.send()
val tasks = Seq(r1).map(i => Task(i))
Task.parSequenceUnordered(tasks).guarantee(backend.close())
}

import monix.execution.Scheduler.Implicits.global
postTask.runToFuture.foreach(println) // prints: List(Task.FlatMap$2052527361)
}

我的困惑很简单(我猜(。我如何运行我创建的Task.parSequenceUnordered,并处理(解析http结果(序列中的任务?

很高兴:出于好奇,在处理任务请求序列时,是否可能天真地引入速率限制/节流?我并不是真的想建造一些复杂的东西。它可以很简单,将一批批请求间隔开。想知道Monix是否已经有助手了。

感谢Oleg Pyzhcov和monix gitter社区帮助我解决了这个问题。

此处引用Oleg:

由于您已经在使用支持monix的后端,因此r1的类型是CCD_ 2。所以当你在做Seq(r1).map(i => Task(i)),你让它成为一系列没有除了给你其他能给你带来结果的任务(类型将是Seq[Task[Task[Response[...]]]](。然后您的代码并行化外层,赋予任务的任务,你得到的任务作为结果,从开始。您只需要为它处理一个Seq(r1(并行运行请求。

如果使用Intellij,可以按Alt + =查看选择-如果不能单独从代码中区分类型,它会有所帮助(但随着经验的积累,情况会变得更好(。

至于速率限制,我们有parSequenceN,可以设置限制并行性。请注意,无序只意味着你会受到轻视性能优势以结果的随机顺序为代价输出,它们无论如何都是非决定性地执行的。

我最终得到了一个(简化的(实现,看起来像这样:

import sttp.client._
import sttp.client.asynchttpclient.monix._
import monix.eval.Task
object SO extends App {
val postTask = AsyncHttpClientMonixBackend().flatMap { implicit backend =>
val r1 = basicRequest.get(uri"https://hello.world.io/v1/bla")
.header("accept", "application/json")
.response(asString)
.body()
.send()
val items = Seq(r1.map(x => x.body))
Task.parSequenceN(1)(items).guarantee(backend.close())
}

import monix.execution.Scheduler.Implicits.global
postTask.runToFuture.foreach(println)
}

相关内容

  • 没有找到相关文章

最新更新