RxJava 中的并行性 - 过滤器



我有一些非常简单的代码,阅读一堆字符串并应用过滤器。我希望过滤器在多个线程上运行。

    Iterable<String> outputs = Observable
            .from(Files.readLines(new File("E:\SAMA\Test\ImageNetBullets.txt"), Charset.forName("utf-8")))
            .take(20).subscribeOn(Schedulers.from(threadPoolExecutor)).filter(str -> isURLOK(str))
            .toBlocking().toIterable();

从日志来看,Filter 方法似乎只在 1 个线程上运行:

In Thread pool-1-thread-1
In Thread pool-1-thread-1
http://farm2.static.flickr.com/1258/1479683334_3ff920d217.jpg
In Thread pool-1-thread-1
In Thread pool-1-thread-1

如何加快速度?

RxJava 本质上是顺序的。例如,使用 map(Func1)Func1本身将与通过父序列传递的值非同时执行:

Observable.range(1, 10).map(v -> v * v).subscribe(System.out::println);

在这里,lambda v -> v * v 将按顺序调用值 1 到 10。

RxJava 可以是异步的,就像管道中的阶段(range->map->subscribe(可以相对于彼此并发/并行发生一样。例如:

Observable.range(1, 10)
.subscribeOn(Schedulers.computation())
.map(v -> v * v)                       // (1)
.observeOn(Schedulers.io())
.map(v -> -v)                          // (2)
.toBlocking()
.subscribe(System.out::println);       // (3)
这里,(1(可以与(2(

和(3(并行运行,即,而(2(计算一个v = 3 * 3,(1(可能已经计算出v = 5并且(3(同时打印出-1

如果你想同时处理序列的元素,你必须将序列"分叉"成子Observable,然后用flatMap连接回结果:

Observable.range(1, 10)
.flatMap(v -> 
    Observable.just(v)
    .subscribeOn(Schedulers.computation())
    .map(v -> v * v)
)
.toBlocking()
.subscribe(System.out::println);

在这里,v的每个值都将启动一个新的Observable,该在后台线程上运行并通过那里的 map(( 进行计算。 v = 1可以在线程 1 上运行,v = 2可以在线程 2 上运行,v = 3可以在线程 1 上运行,但严格在计算v = 1之后运行。

.subscribeOn 的调用只是确定将启动可观察量Scheduler(例如,所有排放都将在调度程序提供的一个线程上传输(。

如果您没有太多工作要处理流中的每个项目,则处理可能由 IO 主导,因此并行处理可能无济于事。

一般来说,一种方法是将流缓冲成块,并在Schedulers.computation()上订阅的flatMap中处理每个块:

 Observable<String> outputs = 
   lines
    .buffer(1000)
    .flatMap(list -> 
        Observable
          .from(list)
          //do something computationally expensive
          .filter(line -> intensive(line))
          .subscribeOn(Schedulers.computation()));

之所以使用buffer,是因为与安排大量小任务相比,安排体面的工作块的开销更少。

相关内容

  • 没有找到相关文章

最新更新