>问题:考虑有多个服务可以执行特定任务的场景。每个服务都可以以不同的时间量响应任务。我们需要始终从性能最快的服务中选择响应。
如果我理解正确,你需要这样的东西:
taskSource
.flatMap(task -> // for each task
Observable.merge(
// submit same task to multiple services
service1.submit(task),
service2.submit(task),
...,
serviceN.submit(task)
)
.take(1)) // take first response; discard others
... // continue processing result of the task
.subscribe(...)
您需要Observable.amb
运算符。它的优点是可以使用具有多个发射的可观测量。
下面是
使用RxJava Observables的示例工作代码,它从一组线程中打印出响应最快的线程的结果。
public static void main(String[] args) {
// Create a slow thread which spans 5 secs
Callable<String> task1 = new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(5000);
return "task1";
}
};
// Create a faster thread which spans 1 secs
Callable<String> task2 = new Callable<String>() {
@Override
public String call() throws Exception {
Thread.sleep(1000);
return "task2";
}
};
List<Callable<String>> tasks = new ArrayList<>();
tasks.add(task1);
tasks.add(task2);
String result = null;
try {
result = Observable.from(tasks)
.subscribeOn(Schedulers.computation())
.flatMap(eachTask -> Observable.fromCallable(eachTask)
.subscribeOn(Schedulers.io())
.doOnNext(e -> System.out.println("Executing your action on "+Thread.currentThread().getName()))
.doOnError(e -> System.out.println("Failed reason for : "+Thread.currentThread().getName()+" with error "+e.getMessage()))
)
.toBlocking()
.first();
} catch (Exception e) {
System.out.println(e.getMessage());
}
System.out.println("result--->"+result);
}