(Datastax 4.1.0) (Cassandra )如何使用session.executeAsync收集所有响应?



我想使用 execute 异步调用 cassandra db。异步调用 在曼努埃尔中,我找到了这段代码,但我无法理解如何将所有行收集到任何列表中。 非常基本的调用,例如从表中选择*,我想存储所有结果。

https://docs.datastax.com/en/developer/java-driver/4.4/manual/core/async/

CompletionStage<CqlSession> sessionStage = CqlSession.builder().buildAsync();
// Chain one async operation after another:
CompletionStage<AsyncResultSet> responseStage =
sessionStage.thenCompose(
session -> session.executeAsync("SELECT release_version FROM system.local"));
// Apply a synchronous computation:
CompletionStage<String> resultStage =
responseStage.thenApply(resultSet -> resultSet.one().getString("release_version"));
// Perform an action once a stage is complete:
resultStage.whenComplete(
(version, error) -> {
if (error != null) {
System.out.printf("Failed to retrieve the version: %s%n", error.getMessage());
} else {
System.out.printf("Server version: %s%n", version);
}
sessionStage.thenAccept(CqlSession::closeAsync);
});

您需要参考有关异步分页的部分 - 您需要提供一个回调,该回调会将数据收集到作为外部对象提供的列表中。文档有以下示例:

CompletionStage<AsyncResultSet> futureRs =
session.executeAsync("SELECT * FROM myTable WHERE id = 1");
futureRs.whenComplete(this::processRows);
void processRows(AsyncResultSet rs, Throwable error) {
if (error != null) {
// The query failed, process the error
} else {
for (Row row : rs.currentPage()) {
// Process the row...
}
if (rs.hasMorePages()) {
rs.fetchNextPage().whenComplete(this::processRows);
}
}
}

在这种情况下,processRows可以将数据存储在作为当前对象一部分的列表中,如下所示:

class Abc {
List<Row> rows = new ArrayList<>();
// call to executeAsync
void processRows(AsyncResultSet rs, Throwable error) {
....
for (Row row : rs.currentPage()) {
rows.add(row);
}
....
}
}

但是您需要非常小心select * from table因为它可能会返回很多结果,而且如果您的数据太多,它可能会超时 - 在这种情况下,最好执行令牌范围扫描(我有一个驱动程序 3.x 的示例,但还没有 4.x 的示例(。

这是 4.x 的示例(您还可以找到 4.4 BTW 中可用的响应式代码示例(

https://github.com/datastax/cassandra-reactive-demo/blob/master/2_async/src/main/java/com/datastax/demo/async/repository/AsyncStockRepository.java

最新更新