无法使用 AbstractReactiveElasticsearchConfiguration 将数据写入 Elasti



我正在尝试将数据写入我的本地Elasticsearch Docker容器(7.4.2(,为了简单起见,我使用了Spring给出的AbstractReactiveElasticsearchConfiguration,也覆盖了entityMapper函数。我构建了扩展 ReactiveElasticsearchRepository 的存储库 最后,我使用我的自动连线存储库来保存包含数据的元素集合All((。但是Elasticsearch不会写入任何数据。此外,我有一个REST控制器,它基本上开始了我的整个过程,基本上什么都不返回,延迟结果>

来自我的 ApiDelegateImpl 的 REST 方法。

@Override
public DeferredResult<ResponseEntity<Void>> openUsageExporterStartPost() {
final DeferredResult<ResponseEntity<Void>> deferredResult = new DeferredResult<>();
ForkJoinPool.commonPool().execute(() -> {
try {
openUsageExporterAdapter.startExport();
deferredResult.setResult(ResponseEntity.accepted().build());
} catch (Exception e) {
deferredResult.setErrorResult(e);
}
}
);
return deferredResult;
}

我的弹性搜索配置

@Configuration
public class ElasticSearchConfig extends AbstractReactiveElasticsearchConfiguration {
@Value("${spring.data.elasticsearch.client.reactive.endpoints}")
private String elasticSearchEndpoint;
@Bean
@Override
public EntityMapper entityMapper() {
final ElasticsearchEntityMapper entityMapper = new ElasticsearchEntityMapper(elasticsearchMappingContext(), new DefaultConversionService());
entityMapper.setConversions(elasticsearchCustomConversions());
return entityMapper;
}
@Override
public ReactiveElasticsearchClient reactiveElasticsearchClient() {
ClientConfiguration clientConfiguration = ClientConfiguration.builder()
.connectedTo(elasticSearchEndpoint)
.build();
return ReactiveRestClients.create(clientConfiguration);
}
}

我的仓库

public interface OpenUsageRepository extends ReactiveElasticsearchRepository<OpenUsage, Long> {
}

我的 DTO

@Data
@Document(indexName = "open_usages", type = "open_usages")
@TypeAlias("OpenUsage")
public class OpenUsage {
@Field(name = "id")
@Id
private Long id;
......
}

我的适配器实现

@Autowired
private final OpenUsageRepository openUsageRepository;
...transform entity into OpenUsage...
public void doSomething(final List<OpenUsage> openUsages){
openUsageRepository.saveAll(openUsages)
}

最后是我的 IT 测试

@SpringBootTest(webEnvironment = WebEnvironment.RANDOM_PORT)
@Testcontainers
@TestPropertySource(locations = {"classpath:application-it.properties"})
@ContextConfiguration(initializers = OpenUsageExporterApplicationIT.Initializer.class)
class OpenUsageExporterApplicationIT {

@LocalServerPort
private int port;
private final static String STARTCALL = "http://localhost:%s/open-usage-exporter/start/";
@Container
private static ElasticsearchContainer container = new ElasticsearchContainer("docker.elastic.co/elasticsearch/elasticsearch:6.8.4").withExposedPorts(9200);
static class Initializer implements ApplicationContextInitializer<ConfigurableApplicationContext> {
@Override
public void initialize(final ConfigurableApplicationContext configurableApplicationContext) {
final List<String> pairs = new ArrayList<>();
pairs.add("spring.data.elasticsearch.client.reactive.endpoints=" + container.getContainerIpAddress() + ":" + container.getFirstMappedPort());
pairs.add("spring.elasticsearch.rest.uris=http://" + container.getContainerIpAddress() + ":" + container.getFirstMappedPort());
TestPropertyValues.of(pairs).applyTo(configurableApplicationContext);
}
}
@Test
void testExportToES() throws IOException, InterruptedException {
final List<OpenUsageEntity> openUsageEntities = dbPreparator.insertTestData();
assertTrue(openUsageEntities.size() > 0);
final String result = executeRestCall(STARTCALL);
// Awaitility here tells me nothing is in ElasticSearch :(
}
private String executeRestCall(final String urlTemplate) throws IOException {
final String url = String.format(urlTemplate, port);
final HttpUriRequest request = new HttpPost(url);
final HttpResponse response = HttpClientBuilder.create().build().execute(request);
// Get the result.
return EntityUtils.toString(response.getEntity());
}
}
public void doSomething(final List<OpenUsage> openUsages){
openUsageRepository.saveAll(openUsages)
}

这在末尾缺少分号,因此不应编译。

但我认为这只是一个错字,现实中有一个分号。

无论如何,saveAll()返回一个Flux。此Flux只是保存数据的方法,直到有人(或类似blockLast()(调用subscribe()它才会"执行"。您只需将其Flux扔掉,因此保存永远不会执行。

如何解决这个问题?一种选择是添加.blockLast()呼叫:

openUsageRepository.saveAll(openUsages).blockLast();

但这将以阻塞方式保存数据,有效地击败反应性。

另一种选择是,如果您从支持响应性调用saveAll()的代码只是返回saveAll()返回的Flux,但是,由于您的doSomething()具有void返回类型,这是值得怀疑的。

无论如何,看不到您的startExport()如何连接到doSomething()。但看起来您的"调用代码"没有使用任何反应性概念,因此真正的解决方案是重写调用代码以使用反应性(获取Publisher并对其进行subscribe(),然后等到数据到达(,或者恢复使用阻塞 API(ElasticsearchRepository而不是ReactiveElasticsearchRepository(。

最新更新