我试图调用存储过程3次。当我在下面运行代码时,从最后一次调用到存储过程的数据只显示在resultSet.getRows()
中。前两次调用存储过程的数据不会出现在resultSet中。以下是我的代码。我做错什么了吗?有人能帮忙吗?
String currentPeriod = String.format("{call %s.testProc(?)}", params.getJsonObject("databaseInfo").getString("dbName"));
String priorPeriod = String.format("{call %s.testProc(?)}", params.getJsonObject("databaseInfo").getString("dbName"));
String todayPeriod = String.format("{call %s.testProc(?)}", params.getJsonObject("databaseInfo").getString("dbName"));
JsonArray jsonArray = new JsonArray();
database.dbObject().getConnectionObservable().subscribe(
connection -> {
Observable<ResultSet> resultSetObservable = connection.callWithParamsObservable(currentPeriod, new JsonArray().add(params.getString("testParams")),jsonArray ).
flatMap(result -> connection.callWithParamsObservable(priorPeriod, new JsonArray().add(params.getString("testParams")), jsonArray ).
flatMap(result -> connection.callWithParamsObservable(todayPeriod, new JsonArray().add(params.getString("testParams")),jsonArray );
resultSetObservable.subscribe(resultSet -> {
handler.handle(ReportUtils.parseSQLResult(resultSet.getRows()));
},error -> {
error.printStackTrace();
},connection::close);
},err -> {
err.printStackTrace();
}
);
你想要做的可以通过使用combinellatest操作符来实现,该操作符给出了所有可观察对象的结果(它将等待所有可观察对象给出结果)
Ref http://reactivex.io/documentation/operators/combinelatest.html
伪代码,
String currentPeriod = String.format("{call %s.testProc(?)}", params.getJsonObject("databaseInfo").getString("dbName"));
String priorPeriod = String.format("{call %s.testProc(?)}", params.getJsonObject("databaseInfo").getString("dbName"));
String todayPeriod = String.format("{call %s.testProc(?)}", params.getJsonObject("databaseInfo").getString("dbName"));
JsonArray jsonArray = new JsonArray();
database.dbObject().getConnectionObservable().subscribe(
connection -> {
resultSetObservable = Observable.combineLatest(firstCall, secCall, thirdCall)
firstCall = connection.callWithParamsObservable(currentPeriod, new JsonArray().add(params.getString("testParams")),jsonArray )
secCall = result -> connection.callWithParamsObservable(priorPeriod, new JsonArray().add(params.getString("testParams")), jsonArray )
thirdCall = result -> connection.callWithParamsObservable(todayPeriod, new JsonArray().add(params.getString("testParams")),jsonArray )
resultSetObservable.subscribe(firstRes, secRes, thirdRes -> {
handler.handle(ReportUtils.parseSQLResult(resultSet.getRows()));
},error -> {
error.printStackTrace();
},connection::close);
},err -> {
err.printStackTrace();
}
);
@Bharath Mg。我已经修改了伪代码,它正在为我工作。
String currentPeriod = String.format("{call %s.test(?)}", params.getJsonObject("databaseInfo").getString("dbName"));
String priorPeriod = String.format("{call %s.test(?)}", params.getJsonObject("databaseInfo").getString("dbName"));
String todayPeriod = String.format("{call %s.test(?)}", params.getJsonObject("databaseInfo").getString("dbName"));
database.dbObject().getConnectionObservable().subscribe(
connection -> {
Observable<ResultSet> firstCall = connection.queryWithParamsObservable(currentPeriod, new JsonArray().add(params.getString("testParams")));
Observable<ResultSet> secondCall = connection.queryWithParamsObservable(priorPeriod, new JsonArray().add(params.getString("testParams")));
Observable<ResultSet> thirdCall = connection.queryWithParamsObservable(todayPeriod, new JsonArray().add(params.getString("testParams")));
Observable.zip(firstCall, secondCall, thirdCall, new Func3<ResultSet, ResultSet, ResultSet, List<JsonObject>>() {
@Override
public List<JsonObject> call(ResultSet resultSet, ResultSet resultSet2, ResultSet resultSet3) {
List<JsonObject> allRecord = new ArrayList<JsonObject>();
allRecord.addAll(resultSet.getRows());
allRecord.addAll(resultSet2.getRows());
allRecord.addAll(resultSet3.getRows());
return allRecord;
}
}).subscribe(resultSet -> {
handler.handle(resultSet);
},error -> {
error.printStackTrace();
},connection::close);
},err -> {
err.printStackTrace();
}
);