给定一个昂贵但需要可变时间的Observable<Input>
和映射函数Function<Input, Output>
,有没有办法在多个输入上并行调用映射函数,并按它们的生成顺序接收输出?
我尝试将observeOn()
与多线程Scheduler
一起使用:
PublishSubject<Input> inputs = PublishSubject.create();
Function<Input, Output> mf = ...
Observer<Output> myObserver = ...
// Note: same results with newFixedThreadPool(2)
Executor exec = Executors.newWorkStealingThreadPool();
// Use ConnectableObservable to make sure mf is called only once
// no matter how many downstream observers
ConnectableObservable<Output> outputs = inputs
.observeOn(SchedulersFrom(exec))
.map(mf)
.publish();
outputs.subscribe(myObserver1);
outputs.subscribe(myObserver2);
outputs.connect();
inputs.onNext(slowInput); // `mf.apply()` takes a long time to complete on this input
inputs.onNext(fastInput); // `mf.apply()` takes a short time to complete on this input
但在测试中,mf.apply(fastInput)
直到mf.apply(slowInput)
完成后才会被调用。
如果我在测试中使用CountDownLatch
玩一些技巧以确保mf.apply(slowInput)
直到mf.apply(fastInput)
之后才能完成,程序就会死锁。
我应该在这里使用一些简单的运算符,还是仅仅违背 RxJava 的粒度而使Observables
乱序,我应该使用不同的技术?
ETA:我考虑过使用ParallelFlowable
(在订阅之前将其转换回带有.sequential()
的普通Flowable
myObserver1/2
,或者更确切地说是mySubscriber1/2
(,但后来我收到了额外的mf.apply()
调用,每个输入每个Subscriber
一个。有ConnectableFlowable
,但我没有太多运气弄清楚如何将其与.parallel()
混合。
我想observeOn
运算符不支持单独并发执行。那么,使用flatMap怎么样?假设mf
函数需要大量时间。
ConnectableObservable<Output> outputs = inputs
.flatMap(it -> Observable.just(it)
.observeOn(SchedulersFrom(exec))
.map(mf))
.publish();
或
ConnectableObservable<Output> outputs = inputs
.flatMap(it -> Observable.just(it)
.map(mf))
.subscribeOn(SchedulersFrom(exec))
.publish();
编辑 2019-12-30
如果要并发运行任务,但应该保持顺序,请使用运算符而不是concatMapEager
flatMap
。
ConnectableObservable<Output> outputs = inputs
.concatMapEager(it -> Observable.just(it) // here
.observeOn(SchedulersFrom(exec))
.map(mf))
.publish();
对我来说听起来是不可能的,除非 Rx 有一些非常专业的运算符来这样做。如果使用flatMap
进行映射,则元素将无序到达。或者您可以使用concatMap
但随后您将失去所需的并行映射。
编辑:正如另一张海报所提到的,concatMapEager 应该为此工作。并行订阅和按顺序结果。