我想在可观察对象上使用reduce()
操作将其映射到Guava ImmutableList
,因为我更喜欢它而不是标准ArrayList
。
Observable<String> strings = ...
Observable<ImmutableList<String>> captured = strings.reduce(ImmutableList.<String>builder(), (b,s) -> b.add(s))
.map(ImmutableList.Builder::build);
captured.forEach(i -> System.out.println(i));
很简单。但是假设我在某个地方安排了可观察的strings
与多个线程并行。这不会使reduce()
操作脱轨并可能导致竞争条件吗?特别是因为ImmutableList.Builder
很容易受到攻击?
问题在于链的实现之间的共享状态。这是我博客中的第8个陷阱:
可观察链中的共享状态
假设您对toList()操作符返回的List的性能或类型不满意,并且希望滚动您自己的聚合器来代替它。为了改变,您希望通过使用现有的操作符来做到这一点,并且您找到了操作符reduce():
Observable<Vector<Integer>> list = Observable
.range(1, 3)
.reduce(new Vector<Integer>(), (vector, value) -> {
vector.add(value);
return vector;
});
list.subscribe(System.out::println);
list.subscribe(System.out::println);
list.subscribe(System.out::println);
当你运行'test'调用时,第一个输出你所期望的,但第二个输出一个向量,其中范围1-3出现两次,第三个订阅输出9个元素!
问题不在于reduce()操作符本身,而在于围绕它的期望。当链建立时,传入的新向量是一个"全局"实例,并将在链的所有求值之间共享。
当然,有一种方法可以解决这个问题,而不需要为整个目的实现操作符(如果您看到前面的CounterOp的潜力,应该非常简单):
Observable<Vector<Integer>> list2 = Observable
.range(1, 3)
.reduce((Vector<Integer>)null, (vector, value) -> {
if (vector == null) {
vector = new Vector<>();
}
vector.add(value);
return vector;
});
list2.subscribe(System.out::println);
list2.subscribe(System.out::println);
list2.subscribe(System.out::println);
您需要从null开始,并在accumulator函数中创建一个vector,现在该vector不能在订阅者之间共享。
或者,您可以查看collect()
操作符,它具有初始值的工厂回调。
这里的经验法则是,当你看到一个类似聚合器的操作符获取一些普通值时,要小心,因为这个"初始值"很可能会被所有订阅者共享,如果你计划使用多个订阅者的结果流,它们将会冲突,可能会给你意想不到的结果,甚至崩溃。
根据可观察对象契约,一个可观察对象不能并行调用onNext
,所以你必须修改你的strings
可观察对象来尊重这一点。您可以使用序列化操作符来实现这一点。