我通过填充一些观察器来触发长时间运行的 CPU 密集型任务,并且无法确定我是否正确设置了内容。
理想情况下,我的要求是:
- 当填充所有 3 个行为主体时触发任务(然后每次其中任何一个更改时(
- 一次只运行一个计算
- 不要提供由于源触发器更改而"过期"的计算结果
我目前的解决方案是这样的,但我不确定这是否是最好的方法。特别是,"版本"变量似乎是错误的。此外,在我的链中指定一个observeOn
似乎是错误的,但我知道这是我使用 BehaviorSubject 的副作用。
final AtomicInteger version = new AtomicInteger(0);
return Observable.combineLatest(
mBehaviorSubjectArg1,
mBehaviorSubjectArg2,
mBehaviorSubjectArg3,
(arg1, arg2, arg3) -> new Arguments(version.incrementAndGet(), arg1, arg2, arg3)
)
// Only keep the latest combination so when observeOn pulls we don't run interim data points
.toFlowable(BackpressureStrategy.LATEST)
// Only buffer 1 so we don't ask for additional arguments until we have completed the last computation.
.observeOn(Schedulers.computation(), false, 1)
.map(args -> new Pair<>(args, mCalculator.run(args)))
.filter(pair -> pair.first.version == version.get())
.map(pair -> pair.second);
这似乎是switchMap
运算符工作的完美示例。如果可以更改mCalculator.run(args)
以返回可观察对象(如果使用Observable.create
和ObservableEmitter.setCancellable
,则可能允许取消(,则以下代码将起作用:
return Observable.combineLatest(
mBehaviorSubjectArg1,
mBehaviorSubjectArg2,
mBehaviorSubjectArg3,
(arg1, arg2, arg3) -> new Arguments(arg1, arg2, arg3)
).switchMap((args) -> mCalculator.run(args).subscribeOn(Schedulers.computation()))
switchMap
将确保只有最新的Arguments
实例将同时运行,如果combineLatest
发出新实例,则取消可观察量。switchMap
内的subscribeOn
是为了确保不会因mCalculator.run
错误而发生死锁