我想在Observable
中读取InputStream
并发出解析数据(假设是DataPacket
)。我还想有不同的subscribers
来处理不同类型的DataPacket
(每个subscriber
将应用它自己的过滤器初始observable
)。这意味着,Observable
应该在不同的subscribers
之间共享状态。我决定使用share()
,但遇到了MissingBackpressureException
。
以下代码失败:
readSubscription = StringObservable.from(mInStream,1024)
.share()
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.subscribe(new Action1<byte[]>() {
@Override
public void call(byte[] bytes) {
}
});
我在subscribe
方法中什么都不做- subscriber
应该足够快。
一切都很好,如果我删除share()
。下面的代码是:
readSubscription = StringObservable.from(mInStream,1024)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.newThread())
.subscribe(new Action1<byte[]>() {
@Override
public void call(byte[] bytes) {
}
});
我明白,share()
可能是一个昂贵的操作,我的InputStream
产生很多消息(每秒约100条消息)。
我的问题:如何实现一个Observable
读取InputStream
,并在不同的Subscribers
之间共享状态
当前的v0.22不支持正确的反压,所以您应该使用onBackpressureBuffer
来避免使用MissingBackpressureException
。我看看我们是否能发布最新的代码。
此外,使用share()
可能会令人惊讶,因为它对订阅者进行引用计数。你不能一次订阅所有的订阅者,他们中的一些人可能不会从一开始就收到所有的值。相反,你可以使用publish()
操作符,并在所有订阅者都订阅后对返回的可观察对象调用connect()
。
你也可以使用cache()
,它将重放源到任何迟到的订阅者,但它也不支持反压,你也需要使用onBackpressureBuffer
。