在Observer.onNext中的RxJava/RxScala异步代码



假设您想将某个流中的事件存储到数据库中,并且该数据库的客户端库是异步的。

。有一个方法writeEvent(event: MyEvent): Future[Boolean],你必须在观察者的onNext中调用,当下一个事件被发出时。是否有一个好的方法来做到这一点,而不是阻塞Future ?

我目前看到的唯一方法是如何实现这一点,是创建一些自定义Scheduler,允许我将线程返回到池,直到onNext内的异步代码完成。

您不希望在onNext订阅者回调中像这样阻塞,这将击败Rx。它们可以用一种更习惯的方式联系在一起。

我不太与期货合作,但我想知道Observable.from(Future)是否会工作:

someStream
    .flatMap(evt => Observable.from(writeEvent(evt)))
    .subscribe(
        next => ...,
        err => ...
)

相关内容

  • 没有找到相关文章

最新更新