我想加载较大的一行数据,所以我的计划是将语句分成几个部分,除以时间戳,然后异步运行。
...
// List to save ResultSets
List<CompletableFuture<AsyncResultSet>> pending = new ArrayList<>();
for(Range range : ranges) {
System.out.println("Asynchronous execute query will be called soon!");
pending.add(executeQuery(session, preparedStatement, range));
}
...
private static CompletableFuture<AsyncResultSet> executeQuery(CqlSession session,
PreparedStatement preparedStatement, Range range) {
return session
.executeAsync(preparedStatement.bind()
.setInstant("startDateTime", range.getStartDateTime().toInstant())
.setInstant("endDateTime", range.getEndDateTime().toInstant())
.setPageSize(1000000))
.toCompletableFuture()
.whenCompleteAsync((asyncResultSet, throwable) -> {
if (throwable == null) {
System.out.println("Range " + range.getStart() + " to " + range.getEnd() +
" has " + asyncResultSet.remaining() + " records.");
fetchResultSet(asyncResultSet, throwable);
if(asyncResultSet.hasMorePages()) {
asyncResultSet.fetchNextPage().whenComplete(LoadCassandraAsync::fetchResultSet);
}
} else {
throwable.printStackTrace();
}
}, Executors.newFixedThreadPool(4))
.exceptionally(throwable -> {
throwable.printStackTrace();
return null;
});
}
我将随机退出代码0(不是来自主方法(,表示它已关闭。或者,在一次获取后,我将一无所获,就像有一个线程在运行但什么都不做一样。
如果我评论"行提取"部分,我得到:
...
Asynchronous execute query will be called soon!
Asynchronous execute query will be called soon!
Asynchronous execute query will be called soon!
Asynchronous execute query will be called soon!
Range 2020-02-14 00:00:00+0700 to 2020-02-14 01:00:00+0700 has 102974 records.
Range 2020-02-14 01:00:00+0700 to 2020-02-14 02:00:00+0700 has 98201 records.
Range 2020-02-14 06:00:00+0700 to 2020-02-14 07:00:00+0700 has 104529 records.
Range 2020-02-14 08:00:00+0700 to 2020-02-14 09:00:00+0700 has 105257 records.
...
我认为这意味着executeQuery()
方法运行良好。
我做错了什么?
根据查询的数量,您可能会耗尽cassandra线程-concurrent_reads(如果我没记错,默认数字是250(
如果检查日志(/var/log/cassandra/system.log
(,则应该会有一条与该问题相关的消息。要解决此问题,请添加一个人工线程。例如,在发送200个查询后等待。