我有一个任务列表,这些任务应该在新线程中逐个处理,然后结果应该由某个主线程显示在方法中。但是,这似乎不起作用,flatMap
方法在主线程中调用。
在这种情况下,为什么 subscribeOn
方法不处理"线程切换"?
在另一个线程中执行某些工作的更好模式是什么?(除了使用Observable.create
和手动创建新线程,非常冗长)
List<Task> tasks = ...;
Observable.from(tasks)
.flatMap(task -> {
// should be handled in a new thread
try {
return Observable.just(task.call());
} catch (Exception e) {
log.error("Error", e);
}
return Observable.empty();
})
.subscribeOn(Schedulers.newThread())
.observeOn(MySchedulers.main())
.subscribe(this::show); // subscribe called from main thread
警告:我不是Java程序员,而是C#,所以所有奇怪的骆驼大小写方法名称都吓坏了我,让我感到困惑。
如果要为flatMap
操作使用新线程,则subscribeOn
位于错误的位置。将其插入 from
和 flatMap
之间。有关subscribeOn
和observeOn
的完整解释,请参阅此答案 - 它是为 .NET 编写的,但原理是相同的。
我不熟悉 Java 中的任务,所以我不确定你的Task
是否像.NET 的Task
以及task.call()
是否异步并启动自己的线程 - 我想不是来自您的问题,因为您说"......应在新线程中逐个处理的任务列表"。
newThread
调度程序为每个订阅者使用一个新线程 - 由于flatMap
将进行单个订阅,因此所有task.call
调用都将在同一线程上进行,尽管该线程与from
运算符的线程不同。
如果task.call
实际上是异步的,那么结果将根据它引入并发性返回,这将独立于 Rx 的语义。
无论哪种方式,(正确放置的)observeOn
都会导致结果传递到主线程上的this::show
。