我有一些非常简单的代码,阅读一堆字符串并应用过滤器。我希望过滤器在多个线程上运行。
Iterable<String> outputs = Observable
.from(Files.readLines(new File("E:\SAMA\Test\ImageNetBullets.txt"), Charset.forName("utf-8")))
.take(20).subscribeOn(Schedulers.from(threadPoolExecutor)).filter(str -> isURLOK(str))
.toBlocking().toIterable();
从日志来看,Filter 方法似乎只在 1 个线程上运行:
In Thread pool-1-thread-1
In Thread pool-1-thread-1
http://farm2.static.flickr.com/1258/1479683334_3ff920d217.jpg
In Thread pool-1-thread-1
In Thread pool-1-thread-1
如何加快速度?
RxJava 本质上是顺序的。例如,使用 map(Func1)
,Func1
本身将与通过父序列传递的值非同时执行:
Observable.range(1, 10).map(v -> v * v).subscribe(System.out::println);
在这里,lambda v -> v * v 将按顺序调用值 1 到 10。
RxJava 可以是异步的,就像管道中的阶段(range->map->subscribe(可以相对于彼此并发/并行发生一样。例如:
Observable.range(1, 10)
.subscribeOn(Schedulers.computation())
.map(v -> v * v) // (1)
.observeOn(Schedulers.io())
.map(v -> -v) // (2)
.toBlocking()
.subscribe(System.out::println); // (3)
这里,(1(可以与(2(和(3(并行运行,即,而(2(计算一个v = 3 * 3,(1(可能已经计算出v = 5并且(3(同时打印出-1
。
如果你想同时处理序列的元素,你必须将序列"分叉"成子Observable
,然后用flatMap
连接回结果:
Observable.range(1, 10)
.flatMap(v ->
Observable.just(v)
.subscribeOn(Schedulers.computation())
.map(v -> v * v)
)
.toBlocking()
.subscribe(System.out::println);
在这里,v
的每个值都将启动一个新的Observable
,该在后台线程上运行并通过那里的 map(( 进行计算。 v = 1
可以在线程 1 上运行,v = 2
可以在线程 2 上运行,v = 3
可以在线程 1 上运行,但严格在计算v = 1
之后运行。
对 .subscribeOn
的调用只是确定将启动可观察量Scheduler
(例如,所有排放都将在调度程序提供的一个线程上传输(。
如果您没有太多工作要处理流中的每个项目,则处理可能由 IO 主导,因此并行处理可能无济于事。
一般来说,一种方法是将流缓冲成块,并在Schedulers.computation()
上订阅的flatMap
中处理每个块:
Observable<String> outputs =
lines
.buffer(1000)
.flatMap(list ->
Observable
.from(list)
//do something computationally expensive
.filter(line -> intensive(line))
.subscribeOn(Schedulers.computation()));
之所以使用buffer
,是因为与安排大量小任务相比,安排体面的工作块的开销更少。