如何从一组线程中提取响应最快的线程的结果



>问题:考虑有多个服务可以执行特定任务的场景。每个服务都可以以不同的时间量响应任务。我们需要始终从性能最快的服务中选择响应。

如果我理解正确,你需要这样的东西:

taskSource
    .flatMap(task -> // for each task
        Observable.merge(
            // submit same task to multiple services
            service1.submit(task),
            service2.submit(task),
            ...,
            serviceN.submit(task)
            )
            .take(1)) // take first response; discard others
    ... // continue processing result of the task
    .subscribe(...)

您需要Observable.amb运算符。它的优点是可以使用具有多个发射的可观测量。

下面是

使用RxJava Observables的示例工作代码,它从一组线程中打印出响应最快的线程的结果。

    public static void main(String[] args) {
            // Create a slow thread which spans 5 secs
            Callable<String> task1 = new Callable<String>() {
                    @Override
                    public String call() throws Exception {
                            Thread.sleep(5000);
                            return "task1";
                    }
            };
            // Create a faster thread which spans 1 secs
            Callable<String> task2 = new Callable<String>() {
                    @Override
                    public String call() throws Exception {
                            Thread.sleep(1000);
                            return "task2";
                    }
            };
            List<Callable<String>> tasks = new ArrayList<>();
            tasks.add(task1);
            tasks.add(task2);
            String result = null;
            try {
                    result =  Observable.from(tasks)
                                    .subscribeOn(Schedulers.computation())
                                    .flatMap(eachTask -> Observable.fromCallable(eachTask)
                                                    .subscribeOn(Schedulers.io())
                                                    .doOnNext(e -> System.out.println("Executing your action on "+Thread.currentThread().getName()))
                                                    .doOnError(e -> System.out.println("Failed reason for : "+Thread.currentThread().getName()+" with error "+e.getMessage()))
                                                    )
                                    .toBlocking()
                                    .first();
            } catch (Exception e) {
                    System.out.println(e.getMessage());
            }
            System.out.println("result--->"+result);
}

相关内容

最新更新