RxJava 异步订阅



我有一个任务列表,这些任务应该在新线程中逐个处理,然后结果应该由某个主线程显示在方法中。但是,这似乎不起作用,flatMap方法在主线程中调用。

在这种情况下,为什么 subscribeOn 方法不处理"线程切换"?

在另一个线程中执行某些工作的更好模式是什么?(除了使用Observable.create和手动创建新线程,非常冗长)

List<Task> tasks = ...;
Observable.from(tasks)
          .flatMap(task -> {
                // should be handled in a new thread
                try {
                    return Observable.just(task.call());
                } catch (Exception e) {
                    log.error("Error", e);
                }
                return Observable.empty();
            })
            .subscribeOn(Schedulers.newThread())
            .observeOn(MySchedulers.main())
            .subscribe(this::show); // subscribe called from main thread

警告:我不是Java程序员,而是C#,所以所有奇怪的骆驼大小写方法名称都吓坏了我,让我感到困惑。

如果要为flatMap操作使用新线程,则subscribeOn位于错误的位置。将其插入 fromflatMap 之间。有关subscribeOnobserveOn的完整解释,请参阅此答案 - 它是为 .NET 编写的,但原理是相同的。

我不熟悉 Java 中的任务,所以我不确定你的Task是否像.NET 的Task以及task.call()是否异步并启动自己的线程 - 我想不是来自您的问题,因为您说"......应在新线程中逐个处理的任务列表"。

newThread调度程序为每个订阅者使用一个新线程 - 由于flatMap将进行单个订阅,因此所有task.call调用都将在同一线程上进行,尽管该线程与from运算符的线程不同。

如果task.call实际上是异步的,那么结果将根据它引入并发性返回,这将独立于 Rx 的语义。

无论哪种方式,(正确放置的)observeOn都会导致结果传递到主线程上的this::show

相关内容

  • 没有找到相关文章

最新更新