假设您想将某个流中的事件存储到数据库中,并且该数据库的客户端库是异步的。
。有一个方法writeEvent(event: MyEvent): Future[Boolean]
,你必须在观察者的onNext
中调用,当下一个事件被发出时。是否有一个好的方法来做到这一点,而不是阻塞Future
?
我目前看到的唯一方法是如何实现这一点,是创建一些自定义Scheduler
,允许我将线程返回到池,直到onNext
内的异步代码完成。
您不希望在onNext
订阅者回调中像这样阻塞,这将击败Rx。它们可以用一种更习惯的方式联系在一起。
我不太与期货合作,但我想知道Observable.from(Future)
是否会工作:
someStream
.flatMap(evt => Observable.from(writeEvent(evt)))
.subscribe(
next => ...,
err => ...
)