为什么 RxJava zip 运算符在上次发出值的线程上工作



>我正在尝试压缩两个在不同线程上发出的可观察量:

Observable<String> xxxx1 = Observable.fromCallable((Func0<String>) () -> {
    try {
        Thread.sleep((long)(Math.random() * 1000));
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "First";
})
        .doOnNext(s -> Log.d("TEEEST", "1 onNext " + s + " thread " + Thread.currentThread().getName()))
        .subscribeOn(Schedulers.computation());
Observable<String> xxxx2 = Observable.fromCallable((Func0<String>) () -> {
    try {
        Thread.sleep((long)(Math.random() * 1000));
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
    return "Second";
})
        .doOnNext(s -> Log.d("TEEEST", "2 onNext " + s + " thread " + Thread.currentThread().getName()))
        .subscribeOn(Schedulers.io());
Observable.zip(xxxx1, xxxx2, (s1, s2) -> {
    Log.d("TEEEST", "zip func thread " + Thread.currentThread().getName());
    return s1.concat(s2);
})
        .map(s -> {
            Log.d("TEEEST", "map thread " + Thread.currentThread().getName());
            return s.concat(" mapped");
        })
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(s -> {
            Log.d("TEEEST", "call " + s + " thread " + Thread.currentThread().getName());
        });

似乎zip在最后一个发出值的线程上工作,这是我的日志的样子:

首次运行:

D/TEEEST: 2 onNext Second thread RxIoScheduler-3
D/TEEEST: 1 onNext First thread RxComputationScheduler-1
D/TEEEST: zip func thread RxComputationScheduler-1
D/TEEEST: map thread RxComputationScheduler-1
D/TEEEST: call FirstSecond mapped thread main

第二次运行:

D/TEEEST: 1 onNext First thread RxComputationScheduler-2
D/TEEEST: 2 onNext Second thread RxIoScheduler-2
D/TEEEST: zip func thread RxIoScheduler-2
D/TEEEST: map thread RxIoScheduler-2
D/TEEEST: call FirstSecond mapped thread main
  1. 此行为是否记录在某处?
  2. 为什么会这样。
  3. 如何确保zip函数和所有下游内容(在我的情况下是map运算符(在特定调度程序上运行,而不是在随机调度程序上工作。

默认情况下,zip在特定Scheduler上运行。

zip 仅在具有要压缩的所有值时发出。因此,它在收到最后一个值的同一线程上发出。

为了确保所有下游操作都发生在特定的调度程序上,您必须定义一个observeOn()副作用。

对于所有下游操作,观察压缩结果就足够了。

Observable.zip(...).observeOn(scheduler)

对于上游,您必须观察在此特定调度程序上压缩的观察器。

Observable.zip(o1.observeOn(scheduler), o2.observeOn(scheduler), ...)

根据您要使用的调度程序,这并不能保证线程。

RXJava 的美妙之处在于,RxJava 中没有运算符在等待来自上游可观察量的值时阻塞线程,而不是保留线程和阻塞,运算符重用上游观察者向其发出值的线程。

该行为不限于压缩。你可以观察到其他运算符相同的行为,例如:flatMap。

线程从提供的池中选取,因为它们可用。

我想在他的允许下稍微修改@tynn的代码内容!

假设我想执行以下两个功能。

fetchFromGoogle = fetchFromGoogle.subscribeOn(Schedulers.newThread());
fetchFromYahoo = fetchFromYahoo.subscribeOn(Schedulers.newThread());

在这里,我们可以使用 #zip 运算符,

// Fetch from both API 
Observable<String> zipped 
    = Observable.zip(fetchFromGoogle, fetchFromYahoo, new Func2<String, String, String>() {
        @Override
        public String call(String google, String yahoo) { 
            // Do something with the results of both threads
            return google + "n" + yahoo;
        }
    });

此外,您可以使用 #concat 运算符逐个运行线程。

Observable<String> concatenated = Observable.concat(fetchFromGoogle, fetchFromYahoo);
// Emit the results one after another

最新更新