>我正在尝试压缩两个在不同线程上发出的可观察量:
Observable<String> xxxx1 = Observable.fromCallable((Func0<String>) () -> {
try {
Thread.sleep((long)(Math.random() * 1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
return "First";
})
.doOnNext(s -> Log.d("TEEEST", "1 onNext " + s + " thread " + Thread.currentThread().getName()))
.subscribeOn(Schedulers.computation());
Observable<String> xxxx2 = Observable.fromCallable((Func0<String>) () -> {
try {
Thread.sleep((long)(Math.random() * 1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
return "Second";
})
.doOnNext(s -> Log.d("TEEEST", "2 onNext " + s + " thread " + Thread.currentThread().getName()))
.subscribeOn(Schedulers.io());
Observable.zip(xxxx1, xxxx2, (s1, s2) -> {
Log.d("TEEEST", "zip func thread " + Thread.currentThread().getName());
return s1.concat(s2);
})
.map(s -> {
Log.d("TEEEST", "map thread " + Thread.currentThread().getName());
return s.concat(" mapped");
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(s -> {
Log.d("TEEEST", "call " + s + " thread " + Thread.currentThread().getName());
});
似乎zip在最后一个发出值的线程上工作,这是我的日志的样子:
首次运行:
D/TEEEST: 2 onNext Second thread RxIoScheduler-3
D/TEEEST: 1 onNext First thread RxComputationScheduler-1
D/TEEEST: zip func thread RxComputationScheduler-1
D/TEEEST: map thread RxComputationScheduler-1
D/TEEEST: call FirstSecond mapped thread main
第二次运行:
D/TEEEST: 1 onNext First thread RxComputationScheduler-2
D/TEEEST: 2 onNext Second thread RxIoScheduler-2
D/TEEEST: zip func thread RxIoScheduler-2
D/TEEEST: map thread RxIoScheduler-2
D/TEEEST: call FirstSecond mapped thread main
- 此行为是否记录在某处?
- 为什么会这样。
- 如何确保zip函数和所有下游内容(在我的情况下是
map
运算符(在特定调度程序上运行,而不是在随机调度程序上工作。
默认情况下,zip
在特定Scheduler
上运行。
zip
仅在具有要压缩的所有值时发出。因此,它在收到最后一个值的同一线程上发出。
为了确保所有下游操作都发生在特定的调度程序上,您必须定义一个observeOn()
副作用。
对于所有下游操作,观察压缩结果就足够了。
Observable.zip(...).observeOn(scheduler)
对于上游,您必须观察在此特定调度程序上压缩的观察器。
Observable.zip(o1.observeOn(scheduler), o2.observeOn(scheduler), ...)
根据您要使用的调度程序,这并不能保证线程。
RXJava 的美妙之处在于,RxJava 中没有运算符在等待来自上游可观察量的值时阻塞线程,而不是保留线程和阻塞,运算符重用上游观察者向其发出值的线程。
该行为不限于压缩。你可以观察到其他运算符相同的行为,例如:flatMap。
线程从提供的池中选取,因为它们可用。
我想在他的允许下稍微修改@tynn的代码内容!
假设我想执行以下两个功能。
fetchFromGoogle = fetchFromGoogle.subscribeOn(Schedulers.newThread());
fetchFromYahoo = fetchFromYahoo.subscribeOn(Schedulers.newThread());
在这里,我们可以使用 #zip 运算符,
// Fetch from both API
Observable<String> zipped
= Observable.zip(fetchFromGoogle, fetchFromYahoo, new Func2<String, String, String>() {
@Override
public String call(String google, String yahoo) {
// Do something with the results of both threads
return google + "n" + yahoo;
}
});
此外,您可以使用 #concat 运算符逐个运行线程。
Observable<String> concatenated = Observable.concat(fetchFromGoogle, fetchFromYahoo);
// Emit the results one after another