我目前正在尝试使用Monix来限制api get请求。我试过使用STTP的Monix后端,它运行良好,直到我完成后无法关闭Monix后端。。。由于这看起来更像是一个sttp问题,而不是Monix问题,我试图通过使用sttp的默认后端来重新解决这个问题,同时仍然使用Monix来节流。
我主要是在消费完可观察的后关闭monix后端
我试图通过以下方式简化问题:
import monix.execution.Scheduler.Implicits.global
val someIter = List(Task(1), Task(2))
val obs: Observable[CancelableFuture[Int]] = Observable
.fromIterable(someIter)
.throttle(3.second, 1)
.map(_.runToFuture)
然而,我仍然不确定如何在Observable被使用后关闭程序,因为它在这里提前终止(不像monix后端的情况(。。。
换句话说,在Observable迭代完成之前,我如何阻止终止程序?
您可以创建Promise
,在.doOnComplete
完成Observable
时完成
并在主线中等待它。
import monix.execution.Scheduler.Implicits.global
val someIter = List(Task(1), Task(2))
val promise = Promise()
val obs: Observable[CancelableFuture[Int]] = Observable.fromIterable(someIter).throttle(3.second, 1)
.map(_.runToFuture)
.doOnComplete(Task { promise.complete(Success()) })
Await.ready(promise.future, Duration.Inf)
除了Artem接受的答案,以及Monix Gitter社区的见解之外,另一个潜在的实现可能是:
val someIter = List(Task(1), Task(2))
val obs =
Observable
.fromIterable(someIter)
.throttle(1 second, 10)
.mapParallelUnordered(10)(x => x.map(x => x.send().body)) // Here we send requests
.sumL // Sum just as an example
.runSyncUnsafe()