在Reactor中实现while循环以获取最新的Elasticsearch索引



我在被动弹性搜索中的索引名称如下:

logs-2020.08.18
logs-2020.08.17
logs-2020.08.16

它将每天创建。

我想获取最新的索引名称,并使用reactiveElasticsearchClient或spring数据获取日志。有可能吗?

我在春季的webflux应用程序中尝试了以下方法:

我有以下代码片段来查找索引可用性:

public Flux<Log> getLogFromLatestIndex(String serialId) {
Calendar cal = Calendar.getInstance();
String currentIndex = StringUtils.EMPTY;
boolean indexExists = false;
while (!indexExists) {
currentIndex = String.format("logs-%s”, format(cal.getTime(), "yyyy.MM.dd"));
indexExists = isIndexExists(currentIndex).block();
cal.add(Calendar.DATE, -1); // Decrease day 1 until you find index
}
SearchQuery searchQuery = new NativeSearchQueryBuilder()
.withQuery(matchQuery("serialId", serialId))
.withIndices(currentIndex)
.build();
return reactiveElasticsearchTemplate.find(searchQuery, Log.class);
}
public Mono<Boolean> isIndexExists(String indexName) {
return reactiveElasticsearchClient.indices().existsIndex(new GetIndexRequest().indices(indexName));
}

如何在不使用块的情况下获取布尔值

indexExists = isIndexExists(currentIndex).block();

显然,我会得到以下错误:

java.lang.IllegalStateException: block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor-http-nio-2

您可以使用Flux.generate(take/skip)(Until/While)在reactor中执行while循环。

注:

  • LocalDate替换了Calendar,因为它是不可变的,更适合函数式/反应式编程
  • isIndexExists方法返回一个Tuple以具有索引名称的引用,但很明显,它可以根据需要替换为一些更具描述性的类
public Flux<Log> getLog(String serialId) {
return Flux.generate(LocalDate::now, this::generateNextDate)
.map(day -> String.format("logs-%s", day.format(DateTimeFormatter.ofPattern("yyyy.MM.dd"))))
.concatMap(this::isIndexExists)
.skipUntil(Tuple2::getT2) // check index exists boolean and drop non-existing ones
.next() // takes first existing
.flatMapMany(tuple -> findLogs(tuple.getT1(), serialId));
}
private LocalDate generateNextDate(LocalDate currentDay, SynchronousSink<LocalDate> sink) {
sink.next(currentDay);
return currentDay.minusDays(1);
}
private Mono<Tuple2<String, Boolean>> isIndexExists(String indexName) {
return reactiveElasticsearchClient.indices().existsIndex(new GetIndexRequest().indices(indexName))
.map(exists -> Tuples.of(indexName, exists));
}
private Flux<Log> findLogs(String index, String serialId) {
// your other ES query here
}

application.yml

spring.data.elasticsearch.client.reactive.endpoints: ip1:9200,ip2:9200,ip3:9200
spring.data.elasticsearch.rest.uris: ip1:9200,ip2:9200,ip3:9200

配置.java

@Configuration
public class ElasticSearchConfig extends AbstractReactiveElasticsearchConfiguration {
@Value("${spring.data.elasticsearch.client.reactive.endpoints}")
private String elasticSearchEndpoint;
@Override
public ReactiveElasticsearchClient reactiveElasticsearchClient() {
final ClientConfiguration clientConfiguration = ClientConfiguration.builder()
.connectedTo(elasticSearchEndpoint.split(","))
.withWebClientConfigurer(webClient -> {
ExchangeStrategies exchangeStrategies = ExchangeStrategies.builder()
.codecs(configurer -> configurer.defaultCodecs()
.maxInMemorySize(-1))
.build();
return webClient.mutate().exchangeStrategies(exchangeStrategies).build();
})
.build();
return ReactiveRestClients.create(clientConfiguration);
}
}

Controller.java

@GetMapping("/getLog/{serialId}")
public Flux<Log> getLog(@PathVariable String serialId) {
return loggerService.getLog(serialId);
}

休息就是你的全部代码。我只是在地图里打印索引名称。虽然我在弹性搜索中有logs-2020.08.21索引,但它一直在打印logs-2020.08.20、logs-20208.19、logs-202 0.08.18等索引,最终会出错。

注意:当我在application.yml中尝试使用单个ip时,我得到了同样的错误。

public Flux<Log> getLog(String serialId) {
return Flux.generate(LocalDate::now, this::generateNextDate)
.map(day -> {
System.out.println(String.format("logs-%s", day.format(DateTimeFormatter.ofPattern("yyyy.MM.dd"))));
return String.format("logs-%s", day.format(DateTimeFormatter.ofPattern("yyyy.MM.dd")));
})               
.flatMap(this::isIndexExists)
.skipUntil(Tuple2::getT2) // check index exists boolean and drop non-existing ones
.next() // takes first existing
.flatMapMany(tuple -> findLogs(tuple.getT1(), serialId));
}
private LocalDate generateNextDate(LocalDate currentDay, SynchronousSink<LocalDate> sink) {
sink.next(currentDay);
return currentDay.minusDays(1);
}
private Mono<Tuple2<String, Boolean>> isIndexExists(String indexName) {
return reactiveElasticsearchClient.indices().existsIndex(new GetIndexRequest().indices(indexName))
.map(exists -> Tuples.of(indexName, exists));
}
private Flux<Log> findLogs(String index, String serialId) {
// your other ES query here
}