RxJava reduce()在并行化时是否不安全?



我想在可观察对象上使用reduce()操作将其映射到Guava ImmutableList,因为我更喜欢它而不是标准ArrayList

Observable<String> strings = ...
Observable<ImmutableList<String>> captured = strings.reduce(ImmutableList.<String>builder(), (b,s) -> b.add(s))
                .map(ImmutableList.Builder::build);
captured.forEach(i -> System.out.println(i));

很简单。但是假设我在某个地方安排了可观察的strings与多个线程并行。这不会使reduce()操作脱轨并可能导致竞争条件吗?特别是因为ImmutableList.Builder很容易受到攻击?

问题在于链的实现之间的共享状态。这是我博客中的第8个陷阱:

可观察链中的共享状态

假设您对toList()操作符返回的List的性能或类型不满意,并且希望滚动您自己的聚合器来代替它。为了改变,您希望通过使用现有的操作符来做到这一点,并且您找到了操作符reduce():

Observable<Vector<Integer>> list = Observable
    .range(1, 3)
    .reduce(new Vector<Integer>(), (vector, value) -> {
        vector.add(value);
        return vector;
    });
list.subscribe(System.out::println);
list.subscribe(System.out::println);
list.subscribe(System.out::println);

当你运行'test'调用时,第一个输出你所期望的,但第二个输出一个向量,其中范围1-3出现两次,第三个订阅输出9个元素!

问题不在于reduce()操作符本身,而在于围绕它的期望。当链建立时,传入的新向量是一个"全局"实例,并将在链的所有求值之间共享。

当然,有一种方法可以解决这个问题,而不需要为整个目的实现操作符(如果您看到前面的CounterOp的潜力,应该非常简单):

Observable<Vector<Integer>> list2 = Observable
    .range(1, 3)
    .reduce((Vector<Integer>)null, (vector, value) -> {
        if (vector == null) {
            vector = new Vector<>();
        }
        vector.add(value);
        return vector;
    });
list2.subscribe(System.out::println);
list2.subscribe(System.out::println);
list2.subscribe(System.out::println);

您需要从null开始,并在accumulator函数中创建一个vector,现在该vector不能在订阅者之间共享。

或者,您可以查看collect()操作符,它具有初始值的工厂回调。

这里的经验法则是,当你看到一个类似聚合器的操作符获取一些普通值时,要小心,因为这个"初始值"很可能会被所有订阅者共享,如果你计划使用多个订阅者的结果流,它们将会冲突,可能会给你意想不到的结果,甚至崩溃。

根据可观察对象契约,一个可观察对象不能并行调用onNext,所以你必须修改你的strings可观察对象来尊重这一点。您可以使用序列化操作符来实现这一点。

相关内容

  • 没有找到相关文章

最新更新