RxJava调度器的用例



在RxJava中有5种不同的调度器可供选择:

  1. immediate():创建并返回一个调度程序,在当前线程上立即执行工作。

  2. trampoline():创建并返回一个Scheduler,该Scheduler在当前线程上排队等待当前工作完成后执行。

  3. newThread():创建并返回一个Scheduler,为每个工作单元创建一个新的Thread。

  4. computation():创建并返回一个用于计算工作的Scheduler。这可以用于事件循环、处理回调和其他计算工作。不要在此调度程序上执行io绑定工作。使用调度器。 io ()

  5. io():创建并返回一个用于io绑定工作的Scheduler。该实现由Executor线程池支持,该线程池将根据需要增长。这可以用于异步执行阻塞IO。不要在这个调度器上执行计算工作。computation()代替

问题:

前3个调度程序是相当不言自明的;然而,我对计算io有点困惑。

  1. "io绑定工作"到底是什么?它是否用于处理流(java.io)和文件(java.nio.files)?是否用于数据库查询?它是否用于下载文件或访问REST api ?
  2. computation()与newThread()有何不同?它是所有的计算()调用是在一个单一的(后台)线程,而不是一个新的(后台)线程每次?为什么在做IO工作时调用computation()是不好的?
  3. 为什么在做计算工作时调用io()不好?

很好的问题,我认为文档可以做更多的细节。

  1. io()是由一个无界线程池支持的,是那种你会用于非计算密集型任务的东西,那是不会给CPU带来太多负载的东西。是的,与文件系统的交互,与不同主机上的数据库或服务的交互都是很好的例子。
  2. computation()由一个有限的线程池支持,其大小等于可用处理器的数量。如果你试图在多个可用的处理器上并行调度CPU密集型工作(比如使用newThread()),那么当线程争夺一个处理器时,你就要承担线程创建开销和上下文切换开销,这可能会对性能造成很大的影响。
  3. 最好只把computation()留给CPU密集型的工作,否则你不会得到好的CPU利用率。
  4. 由于2中讨论的原因,调用io()进行计算工作是不好的。io()是无界的,如果你在io()上并行调度一千个计算任务,那么这一千个任务中的每一个都将有自己的线程,并且会竞争CPU,从而产生上下文切换成本。

最重要的一点是这两个调度器。ioSchedulers.computation由无界线程池支持,而不是问题中提到的其他线程池。此特性仅在Executor使用newCachedThreadPool(不受自动回收线程池的限制)创建的情况下,由Schedulers.from(Executor)共享。

正如在之前的回复和web上的多篇文章中大量解释的那样,调度器。ioSchedulers.computation应谨慎使用,因为它们针对其名称中的工作类型进行了优化。但是,在我看来,它们最重要的作用是为响应流提供真正的并发性。

与新来者的看法相反,响应式流不是固有的并发,而是固有的异步和顺序。由于这个原因,调度器。io应仅在I/O操作阻塞时使用(例如:使用阻塞命令,如Apache IOUtils FileUtils.readFileAsString(…)),因此将冻结调用线程,直到操作完成。

使用异步方法(如Java AsynchronousFileChannel(…))不会在操作期间阻塞调用线程,因此使用单独的线程没有意义。实际上,调度程序。Io 线程并不真正适合异步操作,因为它们不运行事件循环,回调永远不会…被称为.

同样的逻辑也适用于数据库访问或远程API调用。不要使用调度器。io如果你可以使用异步或响应式API来进行调用。

回到并发。您可能无法访问异步或响应式API来异步或并发地执行I/O操作,因此您唯一的选择是在单独的线程上调度多个调用。遗憾的是,响应式流在其末端是顺序的,但好消息是flatMap()操作符可以在其核心引入并发性。并发性必须在流构造中构建,通常使用flatMap()操作符。这个强大的操作符可以被配置为在内部为你的flatMap()内嵌函数提供一个多线程上下文。该上下文由多线程调度程序(如Scheduler)提供。. ioScheduler.computation

在RxJava2 调度器和并发的文章中找到更多细节,其中您将找到关于如何顺序地和并发地使用调度器的代码示例和详细解释。

希望有帮助,

Softjake

这篇博文提供了一个很好的答案

摘自博客文章:

Schedulers.io()由无界线程池支持。它用于非cpu密集型I/O类型的工作,包括与文件系统的交互、执行网络调用、数据库交互等。这个线程池是用来异步执行阻塞IO的。

Schedulers.computation()由一个有界线程池支持,该线程池的大小可达可用处理器的数量。它用于计算或cpu密集型工作,如调整图像大小、处理大型数据集等。注意:当分配的计算线程多于可用内核时,由于上下文切换和线程创建开销(线程争夺处理器时间),性能将会下降。

Schedulers.newThread()为调度的每个工作单元创建一个新线程。这个调度器开销很大,因为每次都会生成新的线程,而且不会重用。

Schedulers.from(Executor Executor)创建并返回一个由指定执行器支持的自定义调度程序。要限制线程池中并发线程的数量,请使用Scheduler.from(Executors.newFixedThreadPool(n))。这保证了如果在所有线程都被占用时调度任务,它将被排队。在显式关闭池之前,池中的线程将一直存在。

主线程或AndroidSchedulers.mainThread()由RxAndroid扩展库提供给RxJava。主线程(也称为UI线程)是用户交互发生的地方。应该小心不要重载这个线程,以防止janky无响应的UI,或者更糟糕的是,应用程序不响应(ANR)对话框。

Schedulers.single()是RxJava 2中的新特性。这个调度器由一个线程支持,按照请求的顺序依次执行任务。

Schedulers.trampoline()由一个参与的工作线程以FIFO(先进先出)的方式执行任务。它通常在实现递归时使用,以避免增加调用堆栈。

相关内容

  • 没有找到相关文章

最新更新