如何在运算符中签入当前元素是最后一个元素



上下文:要处理Flowable<Item>,我需要首先处理第一个item,然后根据这一点,将所有项目累积为单个项目(reduce),或者在没有任何累积的情况下对每个项目应用简单映射(map)。

我可以想到的一种方法是要求操作符知道当前元素是最后一个元素。是否有这样的操作员知道当前元素是否是最后一个元素?我不能使用缓冲区,因为即使不应该进行累加,它也总是会获取2个元素。

AtomicReference<Item> itemRef = new AtomicReference();
itemRef.set(new Item());
Flowable<Item> accumulateOrProcessFlowable = source.
flatMap(item -> {
if(item.shouldBeAccumulated()) {
//Accumulate data into reference
itemRef.set(itemRef.get().addData(item.getData()));
//Return empty to throw away consumed item;
return Flowable.empty();
} else {
item.updateProperty();
return Flowable.just(item);
}
})
.applyIfLastElement(item -> {
if (item.shouldBeAccumulated()) {
return Flowable.just(itemRef.get());
}
})

下面是如何做到这一点(在RxJava 2.x中,它非常接近RxJava 3.x)。诀窍是使用defer(封装Flowable状态的最佳方式,以便可以多次订阅)和concatWith。在CCD_ 8的情况下,CCD_。请注意,作为一种性能改进,您可能并不关心我使用了单元素数组而不是AtomicReference对象(以避免不必要的易失性读取、集合等)。

Flowable<Integer> result = Flowable.defer(() -> {
boolean[] isFirst = new boolean[] { true };
Integer[] state = new Integer[1];
Maybe<Integer> last = Maybe.defer(() -> {
if (state[0] == null) {
return Maybe.empty();
} else {
return Maybe.just(state[0]);
}
});
return source //
.flatMap(x -> {
if (state[0] != null || isFirst[0] && shouldBeAccumulated(x)) {
// accumulate
state[0] = state[0] == null ? 0 : state[0] + x;
isFirst[0] = false;
return Flowable.empty();
} else {
isFirst[0] = false;
return Flowable.just(x);
}
})
.concatWith(last);
});

相关内容

最新更新