RxJava:使用基于单一的累加器进行扫描,该累加器处理中间的单一



我正在编写一个基于 Rx 的单向 UI 流程,其中每个状态减少都是一个Single。通常这样的流是用scan完成的(它们需要以前的状态(,但是当涉及Single时,它有点棘手。我设法让它像这样工作:

val events = Observable.just("event1", "event2", "event3")
val initialState = Single.just(emptyList<String>())
// given a current state produces next state's Single
val reducer = { currentState: List<String>, event: String ->
Single.fromCallable { /* do work */ currentState.plus(event) }
}
events
.scan(
initialState,
{ currentStateSingle, event ->
val nextStateSingle = currentStateSingle
.flatMap { curState -> reducer(curState, event) }
// cache is required to avoid resubscription 
// to all previously emitted single's on each new scan iteration
nextStateSingle.cache()
}
)
.flatMapSingle { it }
.subscribe { state -> println("state updated to $state") }

令我困扰的是,每个事件(在UI环境中可能有很多事件(都会创建一个nextStateSingle.cache()并将其永久添加到现有链中,并且所有曾经发出Single都将保留在那里,无限消耗内存并且永远不会被释放,而在它们发出一次新状态后,根本不需要它们。

我一直在考虑如何通过某种switchMap用法来做到这一点,甚至使用一些外部原子变量来保持状态(而不是扫描(,但我找不到方法。

我看到的唯一其他选择是编写一个自定义运算符,该运算符将订阅内部单,等待结果然后处理它,但我想避免编写自定义运算符。

根据缓存的文档,由于您无法释放源并且无法清除缓存的值,因此您可以使用此解决方法,该解决方法可以通过忘记所有引用来控制缓存并清除混乱的值:

AtomicBoolean shouldStop = new AtomicBoolean();
source.takeUntil(v -> shouldStop.get())
.onTerminateDetach()
.cache()
.takeUntil(v -> shouldStop.get())
.onTerminateDetach()
.subscribe(...);

然后,也许您可以保存状态并偶尔清空引用。

最新更新