如果"source"可观察量发生变化,如何不为长时间运行的任务提供结果



我通过填充一些观察器来触发长时间运行的 CPU 密集型任务,并且无法确定我是否正确设置了内容。

理想情况下,我的要求是:

  • 当填充所有 3 个行为主体时触发任务(然后每次其中任何一个更改时(
  • 一次只运行一个计算
  • 不要提供由于源触发器更改而"过期"的计算结果

我目前的解决方案是这样的,但我不确定这是否是最好的方法。特别是,"版本"变量似乎是错误的。此外,在我的链中指定一个observeOn似乎是错误的,但我知道这是我使用 BehaviorSubject 的副作用。

    final AtomicInteger version = new AtomicInteger(0);
    return Observable.combineLatest(
                mBehaviorSubjectArg1,
                mBehaviorSubjectArg2,
                mBehaviorSubjectArg3,
                (arg1, arg2, arg3) -> new Arguments(version.incrementAndGet(), arg1, arg2, arg3)
            )
            // Only keep the latest combination so when observeOn pulls we don't run interim data points
            .toFlowable(BackpressureStrategy.LATEST)
            // Only buffer 1 so we don't ask for additional arguments until we have completed the last computation. 
            .observeOn(Schedulers.computation(), false, 1)
            .map(args -> new Pair<>(args, mCalculator.run(args)))
            .filter(pair -> pair.first.version == version.get())
            .map(pair -> pair.second);

这似乎是switchMap运算符工作的完美示例。如果可以更改mCalculator.run(args)以返回可观察对象(如果使用Observable.createObservableEmitter.setCancellable,则可能允许取消(,则以下代码将起作用:

return Observable.combineLatest(
            mBehaviorSubjectArg1,
            mBehaviorSubjectArg2,
            mBehaviorSubjectArg3,
            (arg1, arg2, arg3) -> new Arguments(arg1, arg2, arg3)
        ).switchMap((args) -> mCalculator.run(args).subscribeOn(Schedulers.computation()))

switchMap将确保只有最新的Arguments实例将同时运行,如果combineLatest发出新实例,则取消可观察量。switchMap内的subscribeOn是为了确保不会因mCalculator.run错误而发生死锁

相关内容

最新更新