StringObservable.from(InputStream).share()会立即导致MissingBackPr



我想在Observable中读取InputStream并发出解析数据(假设是DataPacket)。我还想有不同的subscribers来处理不同类型的DataPacket(每个subscriber将应用它自己的过滤器初始observable)。这意味着,Observable应该在不同的subscribers之间共享状态。我决定使用share(),但遇到了MissingBackpressureException

以下代码失败:

readSubscription = StringObservable.from(mInStream,1024)
        .share()
        .subscribeOn(Schedulers.io())
        .observeOn(Schedulers.newThread())
        .subscribe(new Action1<byte[]>() {
            @Override
            public void call(byte[] bytes) {
            }
        });

我在subscribe方法中什么都不做- subscriber应该足够快。

一切都很好,如果我删除share()。下面的代码是:

readSubscription = StringObservable.from(mInStream,1024)
        .subscribeOn(Schedulers.io())
        .observeOn(Schedulers.newThread())
        .subscribe(new Action1<byte[]>() {
            @Override
            public void call(byte[] bytes) {
            }
        });

我明白,share()可能是一个昂贵的操作,我的InputStream产生很多消息(每秒约100条消息)。

我的问题:如何实现一个Observable读取InputStream,并在不同的Subscribers之间共享状态

当前的v0.22不支持正确的反压,所以您应该使用onBackpressureBuffer来避免使用MissingBackpressureException。我看看我们是否能发布最新的代码。

此外,使用share()可能会令人惊讶,因为它对订阅者进行引用计数。你不能一次订阅所有的订阅者,他们中的一些人可能不会从一开始就收到所有的值。相反,你可以使用publish()操作符,并在所有订阅者都订阅后对返回的可观察对象调用connect()

你也可以使用cache(),它将重放源到任何迟到的订阅者,但它也不支持反压,你也需要使用onBackpressureBuffer

相关内容

  • 没有找到相关文章

最新更新