使用RX Java操作员设计API流



我是RX-Java的新手,我正在尝试设计一个API,其中流量如下所述:

            Make REST call A to load data
                        |
                        |
        data not found? |  data found  
    ------------------------------------    
    |                                  |           
    |                                  |            
    |                                  |
Make REST Call B                  Load DB Data 1     
    |                                  |            
    |                                  |
    |                              _________________________
    |                             |       Parallel         |
    |                             |                        |
    |                             |                        |
    |                  (condition using DB data 1)   (condition using DB data 1)
    |                      Load REST Data C                Load DB Data 2
    |                             |                        |
    |                             |________________________|
    |                                         |
    |                                         |
Build Response                            Build Response

假设DB方法和服务调用可观察到可观察到的,需要使用RX运算符的上述骨架流动?

我将在下面共享封锁伪代码:

Response = REST_Call_1(); // on error throw Exception
if (isResponseValid(response)) { // returns Boolean
    if (responseUnderReview(response)) { // validation func 
        throw Exception;
    } else {
        //db_data_1 and db_data_2 can be parallel
        db_data_1 = Load_DB_Data_1();
        // Load data_3 and data_2 based on db_data_1
        if (is_data_3_required(db_data_1)) {
            data_3 = REST_call_2();
        }
        if (is_data_2_required(db_data_1)) {
            db_data_2 = REST_call_2();
        }
        buildResponse(db_data_1, db_data_2, data_3, Response);
    }
} else {
    Response = REST_Call_3(); // on error throw Exception
    buildResponse(response);
}

我正在寻找一种完整的非阻滞异步方法。

逻辑的一般流量如下:

retrofitClient
.loadData()...
.onErrorResumeNext(Observable.empty()) // or handle specific errors only
.flatMap(foundData -> 
    Observable.zip(
       database.call1(foundData),
       database.call2(foundData),
       (call1, call2) -> buildResponse(call1,call2)
    )
 )
 .switchIfEmpty(() ->
    retrofitClient
    .callB()
    .map(response -> buildResponse(response))
 )

请注意,如果流中有复杂的逻辑,我总是尝试将其提取到单独的方法中。在您的情况下,基于其余调用调用数据库可能涉及一些转换 - 如果结果逻辑超过一两行,我将其移动到单独的方法并使用RX流中的方法参考。p>这个想法是将流程作为可以在单个页面中查看和解析的东西,并在方法中隐藏实现细节。

编辑:编辑后,也许这更有意义:

REST_Call_1()
.filter(response -> isResponseValid(response))
.flatMap(response ->
     isResponseUnderReview(response)
     ? Observable.error(new Exception())
     : Observable.just(response)
)
.flatMap(foundData -> 
    Observable.zip(
       fetchData13(foundData),
       Load_DB_Data_2(foundData),
       (data13, call2) -> buildResponse(data13.getLeft(),call2,data13.getRight())
    )
 )
 .switchIfEmpty(() ->
    REST_Call_3()
    .flatMap(response -> buildResponse(response))
 )
 .subscribe(....)
 private Observable<Pair<DbData1, DbData3>> fetchData13(foundData) {
    return
    Load_DB_Data_1()
    .flatMap(data1 -> is_data_3_required(data1)
        ? REST_call_2().map(data3 -> Pair.of(data1, data3))
        : Pair.of(data1, null));
 }

tassos的答案对我来说很合理,我看不到任何其他问题,但是

// dataSource.getItemDetails(activityId returns List<ItemDetail> and is a blocking call
// So, I want to run it on a separate IO thread.
return Observable.from(dataSource.getItemDetails(activityId)).observeOn(Schedulers.io());

如果是这样,将阻止单元件调用转换为单线呼叫,则可以如下:

Observable.fromCallable(() -> yourBlockingCall())
.subscribeOn(Schedulers.io())
.flatMapIterable(v -> v)
...

Observable.defer(() -> Observable.from(yourBlockingCall()))
.subscribeOn(Schedulers.io())
...

编辑:基于图,我将设置以下流程:

serviceCallA()
.flatMap(a -> {
    if (dataFound(a)) {
        return dbCall1()
           .flatMap(db1 -> {
               Observable o1 = shouldCallServiceC(db1) 
                    ? serviceCallC() : just(placeholderC);
               Observable o2 = shouldCallDB2(db1) 
                    ? dbCall2() ? just(placeHolderDb2);
               return zip(o1, o2, (c, d) -> createResult(c, d));   
           });
    }
    return serviceCallB()
        .map(c -> mapToResultType(c));
});

最新更新