数据没有在java rxObservable的resultSet中聚合



我试图调用存储过程3次。当我在下面运行代码时,从最后一次调用到存储过程的数据只显示在resultSet.getRows()中。前两次调用存储过程的数据不会出现在resultSet中。以下是我的代码。我做错什么了吗?有人能帮忙吗?

String currentPeriod = String.format("{call %s.testProc(?)}", params.getJsonObject("databaseInfo").getString("dbName"));
String priorPeriod   = String.format("{call %s.testProc(?)}", params.getJsonObject("databaseInfo").getString("dbName"));
String todayPeriod   = String.format("{call %s.testProc(?)}", params.getJsonObject("databaseInfo").getString("dbName"));
 JsonArray jsonArray = new JsonArray();
        database.dbObject().getConnectionObservable().subscribe(
                connection -> {
                    Observable<ResultSet> resultSetObservable = connection.callWithParamsObservable(currentPeriod, new JsonArray().add(params.getString("testParams")),jsonArray ).                                                                              
                            flatMap(result -> connection.callWithParamsObservable(priorPeriod, new JsonArray().add(params.getString("testParams")), jsonArray ).
                            flatMap(result -> connection.callWithParamsObservable(todayPeriod, new JsonArray().add(params.getString("testParams")),jsonArray );
                    resultSetObservable.subscribe(resultSet -> {
                        handler.handle(ReportUtils.parseSQLResult(resultSet.getRows()));
                    },error -> {
                        error.printStackTrace();
                    },connection::close);
                },err -> {
                    err.printStackTrace();
                }
        );

你想要做的可以通过使用combinellatest操作符来实现,该操作符给出了所有可观察对象的结果(它将等待所有可观察对象给出结果)

Ref http://reactivex.io/documentation/operators/combinelatest.html

伪代码,

       String currentPeriod = String.format("{call %s.testProc(?)}", params.getJsonObject("databaseInfo").getString("dbName"));
      String priorPeriod   = String.format("{call %s.testProc(?)}", params.getJsonObject("databaseInfo").getString("dbName"));
      String todayPeriod   = String.format("{call %s.testProc(?)}", params.getJsonObject("databaseInfo").getString("dbName"));
     JsonArray jsonArray = new JsonArray();

    database.dbObject().getConnectionObservable().subscribe(
            connection -> {
                resultSetObservable = Observable.combineLatest(firstCall, secCall, thirdCall)
     firstCall = connection.callWithParamsObservable(currentPeriod, new JsonArray().add(params.getString("testParams")),jsonArray )
      secCall = result -> connection.callWithParamsObservable(priorPeriod, new JsonArray().add(params.getString("testParams")), jsonArray )
     thirdCall = result -> connection.callWithParamsObservable(todayPeriod, new JsonArray().add(params.getString("testParams")),jsonArray )
                resultSetObservable.subscribe(firstRes, secRes, thirdRes -> {
                    handler.handle(ReportUtils.parseSQLResult(resultSet.getRows()));
                },error -> {
                    error.printStackTrace();
                },connection::close);
            },err -> {
                err.printStackTrace();
            }
    );

@Bharath Mg。我已经修改了伪代码,它正在为我工作。

String currentPeriod = String.format("{call %s.test(?)}", params.getJsonObject("databaseInfo").getString("dbName"));
String priorPeriod   = String.format("{call %s.test(?)}", params.getJsonObject("databaseInfo").getString("dbName"));
String todayPeriod   = String.format("{call %s.test(?)}", params.getJsonObject("databaseInfo").getString("dbName"));

database.dbObject().getConnectionObservable().subscribe(
        connection -> {
            Observable<ResultSet> firstCall  = connection.queryWithParamsObservable(currentPeriod, new JsonArray().add(params.getString("testParams")));
            Observable<ResultSet> secondCall = connection.queryWithParamsObservable(priorPeriod, new JsonArray().add(params.getString("testParams")));
            Observable<ResultSet> thirdCall  = connection.queryWithParamsObservable(todayPeriod, new JsonArray().add(params.getString("testParams")));
            Observable.zip(firstCall, secondCall, thirdCall, new Func3<ResultSet, ResultSet, ResultSet, List<JsonObject>>() {
                @Override
                public List<JsonObject> call(ResultSet resultSet, ResultSet resultSet2, ResultSet resultSet3) {
                    List<JsonObject> allRecord = new ArrayList<JsonObject>();
                    allRecord.addAll(resultSet.getRows());
                    allRecord.addAll(resultSet2.getRows());
                    allRecord.addAll(resultSet3.getRows());
                    return allRecord;
                }
            }).subscribe(resultSet -> {
                handler.handle(resultSet);
            },error -> {
                error.printStackTrace();
            },connection::close);
        },err -> {
            err.printStackTrace();
        }
);

相关内容

  • 没有找到相关文章

最新更新