RxJava:有没有某种链可以让我"wait until observables A, B, C are complete before continuing"?



在jQuery中,promise的概念是通过:实现的

var taskA = $.Deferred();
var taskB = $.Deferred();
var taskC = $.Deferred();
// Callback starts when all tasks are complete/resolved
$.when(taskA, taskB, taskC).done(function(results) {
    var resultA = results[0];
    var resultB = results[1];
    var resultC = results[2];
    // ...
});
// Fire off long tasks
longTask1(function(result) { taskA.resolve("result"); });
longTask2(function(result) { taskB.resolve("result"); });
longTask3(function(result) { taskC.resolve("result"); });

我觉得RxJava中应该有一个模棱两可的词,但我不知道它的名字

编辑:添加我的Android活动代码以澄清

/**
 * What I expected to happen:
 * - User has to click buttons A, B and C in order to pass.
 *
 * What actually happens
 * - User clicks any button and passes.
 */
public class RxTestActivity extends AppCompatActivity {
    PublishSubject<Boolean> subjectClickedA;
    PublishSubject<Boolean> subjectClickedB;
    PublishSubject<Boolean> subjectClickedC;
    @Bind(R.id.btnA) Button btnA;
    @Bind(R.id.btnB) Button btnB;
    @Bind(R.id.btnC) Button btnC;
    @Bind(R.id.labelTest) TextView txtSuccess;

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_test);
        ButterKnife.bind(this);
        subjectClickedA = PublishSubject.create();
        subjectClickedB = PublishSubject.create();
        subjectClickedC = PublishSubject.create();
        // Create an observer which waits until all 3 tasks are complete before
        // triggering it's own "onCompleted".
        Observable.merge(subjectClickedA, subjectClickedB, subjectClickedC)
            // Ensure all subjects are completed
            // TODO: doesn't work
            .all(new Func1<Boolean, Boolean>() {
                @Override
                public Boolean call(Boolean aBoolean) {
                    // Won't let anything through until all of tasks are complete
                    return subjectClickedA.hasCompleted() && subjectClickedB.hasCompleted() && subjectClickedC.hasCompleted();
                }
            })
            .subscribe(new Subscriber<Boolean>() {
                @Override
                public void onError(Throwable e) {
                }
                @Override
                public void onNext(Boolean aBoolean) {
                    // aBoolean value comes in as false
                    Log.e("Subscriber onNext", String.valueOf(aBoolean));
                }
                @Override
                public void onCompleted() {
                    // This triggers on the first button click (no matter which one)
                    // TODO: reward user
                    Log.e("Subscriber", "all tasks completed!");
                    txtSuccess.setText("Great success!");
                }
            });
    }
    @OnClick(R.id.btnA)
    protected void onButtonAClicked() {
        subjectClickedA.onNext(true);
//        subjectClickedA.onCompleted();
        btnA.setEnabled(false);
    }
    @OnClick(R.id.btnB)
    protected void onButtonBClicked() {
        subjectClickedB.onNext(true);
//        subjectClickedB.onCompleted();
        btnB.setEnabled(false);
    }
    @OnClick(R.id.btnC)
    protected void onButtonCClicked() {
        subjectClickedC.onNext(true);
//        subjectClickedC.onCompleted();
        btnC.setEnabled(false);
    }
}

您可以通过多种方式组合多个Observable。

请调查一下。组合可观察

本节介绍可用于组合多个Observable的运算符。

  • startWith() —在开始之前发出指定的项目序列从Observable发射项目
  • 合并() —将多个Observable合并为一个
  • 合并延迟错误() —将多个Observable合并为一个,允许无错误的Observables在传播错误之前继续
  • 拉链() —组合两个或多个Observable发射的项目集通过指定的函数组合在一起,并根据结果发射项目该函数的(rxjava联接)和(),那么(),以及何时() —通过以下方式组合两个或多个Observable发射的项目集模式和计划中介
  • combine最新() —当一个项目由两个Observable,通过指定的函数并基于此结果发射项函数
  • 联接()和groupJoin() —将两个发射的项目合并每当一个Observable中的一个项目落入由另一个发出的项目指定的持续时间窗口可观察
  • 切换到下一个() —将发射Observable的Observable转换为单个Observable,它发出最近发出的项目这些观测到的

相关内容

最新更新