我正在编写一个基于 Rx 的单向 UI 流程,其中每个状态减少都是一个Single
。通常这样的流是用scan
完成的(它们需要以前的状态(,但是当涉及Single
时,它有点棘手。我设法让它像这样工作:
val events = Observable.just("event1", "event2", "event3")
val initialState = Single.just(emptyList<String>())
// given a current state produces next state's Single
val reducer = { currentState: List<String>, event: String ->
Single.fromCallable { /* do work */ currentState.plus(event) }
}
events
.scan(
initialState,
{ currentStateSingle, event ->
val nextStateSingle = currentStateSingle
.flatMap { curState -> reducer(curState, event) }
// cache is required to avoid resubscription
// to all previously emitted single's on each new scan iteration
nextStateSingle.cache()
}
)
.flatMapSingle { it }
.subscribe { state -> println("state updated to $state") }
令我困扰的是,每个事件(在UI环境中可能有很多事件(都会创建一个nextStateSingle.cache()
并将其永久添加到现有链中,并且所有曾经发出Single
都将保留在那里,无限消耗内存并且永远不会被释放,而在它们发出一次新状态后,根本不需要它们。
我一直在考虑如何通过某种switchMap
用法来做到这一点,甚至使用一些外部原子变量来保持状态(而不是扫描(,但我找不到方法。
我看到的唯一其他选择是编写一个自定义运算符,该运算符将订阅内部单,等待结果然后处理它,但我想避免编写自定义运算符。
根据缓存的文档,由于您无法释放源并且无法清除缓存的值,因此您可以使用此解决方法,该解决方法可以通过忘记所有引用来控制缓存并清除混乱的值:
AtomicBoolean shouldStop = new AtomicBoolean();
source.takeUntil(v -> shouldStop.get())
.onTerminateDetach()
.cache()
.takeUntil(v -> shouldStop.get())
.onTerminateDetach()
.subscribe(...);
然后,也许您可以保存状态并偶尔清空引用。