在Observable消耗完成之前,我如何阻止终止我的程序



我目前正在尝试使用Monix来限制api get请求。我试过使用STTP的Monix后端,它运行良好,直到我完成后无法关闭Monix后端。。。由于这看起来更像是一个sttp问题,而不是Monix问题,我试图通过使用sttp的默认后端来重新解决这个问题,同时仍然使用Monix来节流。

我主要是在消费完可观察的后关闭monix后端

我试图通过以下方式简化问题:

import monix.execution.Scheduler.Implicits.global
val someIter = List(Task(1), Task(2))
val obs: Observable[CancelableFuture[Int]] = Observable
.fromIterable(someIter)
.throttle(3.second, 1)
.map(_.runToFuture)

然而,我仍然不确定如何在Observable被使用后关闭程序,因为它在这里提前终止(不像monix后端的情况(。。。

换句话说,在Observable迭代完成之前,我如何阻止终止程序?

您可以创建Promise,在.doOnComplete完成Observable时完成

并在主线中等待它。

import monix.execution.Scheduler.Implicits.global
val someIter = List(Task(1), Task(2))
val promise = Promise()
val obs: Observable[CancelableFuture[Int]] = Observable.fromIterable(someIter).throttle(3.second, 1)
.map(_.runToFuture)
.doOnComplete(Task { promise.complete(Success()) })
Await.ready(promise.future, Duration.Inf)

除了Artem接受的答案,以及Monix Gitter社区的见解之外,另一个潜在的实现可能是:

val someIter = List(Task(1), Task(2))
val obs =
Observable
.fromIterable(someIter)
.throttle(1 second, 10)
.mapParallelUnordered(10)(x => x.map(x => x.send().body)) // Here we send requests
.sumL // Sum just as an example 
.runSyncUnsafe()

相关内容

  • 没有找到相关文章

最新更新