我在 flink 中使用 ElasticSearch update by query API 进行,flink 并行度为 1。但是我得到了version_conflict_engine_exception,这是我在flink RichSinkFunction中的代码,如下所示:
UpdateByQueryRequestBuilder builder = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
builder.abortOnVersionConflict(true);
builder.source(indexName);
builder.filter(filter);
builder.setMaxRetries(MAX_RETRIES);
builder.refresh(true);
String updateTime = Instant.ofEpochMilli(ts).atZone(ZoneId.systemDefault())
.format(ELASTIC_SEARCH_DATE_TIME_FORMATTER);
Map<String, Object> params = Maps.newHashMap();
params.put("fieldName", fieldName);
params.put("updateTime", updateTime);
params.put("model", this.transformMap(JacksonUtils.convertValue(model, new TypeReference<Map<String, Object>>() {
})));
builder.script(new Script(ScriptType.INLINE, Script.DEFAULT_SCRIPT_LANG, UPDATE_BY_MODEL_PAINLESS_CODE, params));
BulkByScrollResponse response = builder.get();
我可以确定,只有这个应用程序访问 Elasticsearch,flink 并行度是 1 就像在单线程中通过查询 API 调用更新一样?为什么我有version_conflict_engine_exception?以及如何做一次?
我看到两种可能性:
- 正在运行的其他内容可以更新文档。
- Flink 的 elasticsearch sink 提供了至少一次的保证,这意味着在发生故障时,接收器有时会在恢复过程中执行重复写入。这可能会导致尝试使用过期的版本号更新文档。