在单个可观察对象上并行化 map() 操作,并无序接收结果



给定一个昂贵但需要可变时间的Observable<Input>和映射函数Function<Input, Output>,有没有办法在多个输入上并行调用映射函数,并按它们的生成顺序接收输出?

我尝试将observeOn()与多线程Scheduler一起使用:

PublishSubject<Input> inputs = PublishSubject.create();
Function<Input, Output> mf = ...
Observer<Output> myObserver = ...
// Note: same results with newFixedThreadPool(2)
Executor exec = Executors.newWorkStealingThreadPool();
// Use ConnectableObservable to make sure mf is called only once
// no matter how many downstream observers
ConnectableObservable<Output> outputs = inputs
.observeOn(SchedulersFrom(exec))
.map(mf)
.publish();
outputs.subscribe(myObserver1);
outputs.subscribe(myObserver2);
outputs.connect();
inputs.onNext(slowInput); // `mf.apply()` takes a long time to complete on this input
inputs.onNext(fastInput); // `mf.apply()` takes a short time to complete on this input

但在测试中,mf.apply(fastInput)直到mf.apply(slowInput)完成后才会被调用。

如果我在测试中使用CountDownLatch玩一些技巧以确保mf.apply(slowInput)直到mf.apply(fastInput)之后才能完成,程序就会死锁。

我应该在这里使用一些简单的运算符,还是仅仅违背 RxJava 的粒度而使Observables乱序,我应该使用不同的技术?


ETA:我考虑过使用ParallelFlowable(在订阅之前将其转换回带有.sequential()的普通FlowablemyObserver1/2,或者更确切地说是mySubscriber1/2(,但后来我收到了额外的mf.apply()调用,每个输入每个Subscriber一个。有ConnectableFlowable,但我没有太多运气弄清楚如何将其与.parallel()混合。

我想observeOn运算符不支持单独并发执行。那么,使用flatMap怎么样?假设mf函数需要大量时间。

ConnectableObservable<Output> outputs = inputs
.flatMap(it -> Observable.just(it)
.observeOn(SchedulersFrom(exec))
.map(mf))
.publish();

ConnectableObservable<Output> outputs = inputs
.flatMap(it -> Observable.just(it)
.map(mf))
.subscribeOn(SchedulersFrom(exec))
.publish();

编辑 2019-12-30

如果要并发运行任务,但应该保持顺序,请使用运算符而不是concatMapEagerflatMap

ConnectableObservable<Output> outputs = inputs
.concatMapEager(it -> Observable.just(it) // here
.observeOn(SchedulersFrom(exec))
.map(mf))
.publish();

对我来说听起来是不可能的,除非 Rx 有一些非常专业的运算符来这样做。如果使用flatMap进行映射,则元素将无序到达。或者您可以使用concatMap但随后您将失去所需的并行映射。

编辑:正如另一张海报所提到的,concatMapEager 应该为此工作。并行订阅和按顺序结果。

相关内容

  • 没有找到相关文章

最新更新