RxJava .subscribeOn(Schedulers.newThread()) questions



我在普通的JDK 8上。我有一个简单的RxJava示例:

Observable
.from(Arrays.asList("one", "two", "three"))
.doOnNext(word -> System.out.printf("%s uses thread %s%n", word, Thread.currentThread().getName()))
//.subscribeOn(Schedulers.newThread())
.subscribe(word -> System.out.println(word));

它逐行打印出单词,与有关线程的信息交织在一起,正如预期的那样,线程是所有后续调用的"主要"。

但是,当我取消注释subscribeOn(Schedulers.newThread())调用时,根本没有打印任何内容。为什么它不起作用?我本来希望它为每个onNext()调用启动一个新线程,并doOnNext()打印该线程的名称。现在,我什么也没看到,其他调度程序也是如此。

当我在 main 末尾添加对 Thread.sleep(10000L) 的调用时,我可以看到输出,这表明 RxJava 使用的线程都是守护进程。是这样吗?是否可以以某种方式更改此设置,但使用自定义线程工厂或类似概念,而不必实现自定义调度程序?

通过上述更改,线程名称始终是RxNewThreadScheduler-1,而 newThread 的文档说"为每个工作单元创建新{@link Thread}的调度程序"。难道它不应该为所有排放创建一个新线程吗?

正如 Vladimir 所提到的,RxJava 标准调度程序在守护进程线程上运行工作,这些线程在您的示例中终止,因为主线程退出。我想强调的是,他们不会在新线程上调度每个值,而是在新创建的线程上为每个订阅者安排值流。第二次订阅会给你"RxNewThreadScheduler-2"。

您实际上不需要更改默认调度程序,只需使用 Schedulers.from() 包装您自己的基于 Executor 的调度程序,并在需要时将其作为参数提供:

ThreadPoolExecutor exec = new ThreadPoolExecutor(
        0, 64, 2, TimeUnit.SECONDS, new LinkedBlockingQueue<>());
exec.allowCoreThreadTimeOut(true);
Scheduler s = Schedulers.from(exec);
Observable
.from(Arrays.asList("one", "two", "three"))
.doOnNext(word -> System.out.printf("%s uses thread %s%n", word,
    Thread.currentThread().getName()))
.subscribeOn(s)
.subscribe(word -> System.out.println(word));

我有一系列关于RxJava调度器的博客文章,它们应该可以帮助你实现一个"更永久"的变体。

与新手的看法相反,反应式流本质上不是并发的,而是固有的异步的。它们本质上也是顺序的,必须在流中配置并发性。简而言之,反应式流在其末端自然是顺序的,但在其核心可以是并发的

秘诀是在流中使用 flatMap() 运算符。此运算符从源流中获取 Observable 输入,并在内部将其作为 Observable> 流重新发出,它一次订阅了所有实例 只要 flatMap() 内部流在多线程上下文中执行,它就会并发执行提供的 Function<T、R> 应用您的逻辑,最后,在原始流上重新发出结果作为它自己的发射。

这听起来很复杂(乍一看相当复杂),但带有解释的简单示例有助于理解这个概念。

从类似问题中查找更多详细信息,以及有关 RxJava2 调度程序和并发性的文章,其中包含代码示例以及如何按顺序和并发使用调度程序的详细说明。

希望这有帮助,

软杰克

public class MainClass {
    public static void main(String[] args) {
        Scheduler scheduler = Schedulers.from(Executors.newFixedThreadPool(10, Executors.defaultThreadFactory()));
        Observable.interval(1,TimeUnit.SECONDS)
                .doOnNext(word -> System.out.printf("%s uses thread %s%n", word,
                        Thread.currentThread().getName()))
                .subscribeOn(scheduler)
                .observeOn(Schedulers.io())
                .doOnNext(word -> System.out.printf("%s uses thread %s%n", word,
                        Thread.currentThread().getName()))
                .subscribe();
    }
}

相关内容

  • 没有找到相关文章

最新更新