如何在Elasticsearch7 sink中使用setapiccompatibilitymode进行Flink流作业.



我有一个flink流作业,将处理过的数据保存到elasticsearch version 8集群,不幸的是,我使用的是flink version 1.13.6,它仍然使用elasticsearch version 7 sink。所以当我试图保存数据时,我得到一个解析错误。

我有两个选择,要么删除我当前的集群并设置一个elasticsearch 7集群,要么为sink启用兼容模式。

我不能删除我的集群,因为我有大量的重要数据将永远恢复,因为快照不工作从高到低版本我第二个选项。

我在网上搜索,发现这个:https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-compatibility.html.

问题是我不知道在哪里启用它。下面是我的代码:

import java.io.Serializable;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink;
import org.apache.flink.streaming.connectors.elasticsearch7.RestClientFactory;
import org.apache.flink.util.ExceptionUtils;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
import org.apache.http.auth.UsernamePasswordCredentials;
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.RestClientBuilder;
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
public class ElasticMemberSink implements Serializable {
/**
* 
*/
private static final long serialVersionUID = 1L;
private transient ElasticsearchSink.Builder<Tuple4<String, Integer, Integer, Integer>> memberEsSinkBuilder;
public ElasticMemberSink(List<HttpHost> httpHosts, String elasticPassword) {
// create member sink
memberEsSinkBuilder = new ElasticsearchSink.Builder<>(httpHosts,
new ElasticsearchSinkFunction<Tuple4<String, Integer, Integer, Integer>>() {
public IndexRequest createIndexRequest(Tuple4<String, Integer, Integer, Integer> memberSummaryTuple)
throws JsonProcessingException {
Date date = new Date();
Map<String, Object> json = new HashMap<>();
json.put("serverId", memberSummaryTuple.f0);
json.put("date", String.valueOf(date.getTime()));
json.put("numLeft", memberSummaryTuple.f1);
json.put("numJoined", memberSummaryTuple.f2);
json.put("memberCount", memberSummaryTuple.f3);
return Requests.indexRequest().index("prod-members").type("_doc").source(json);
}
@Override
public void process(Tuple4<String, Integer, Integer, Integer> memberSummaryTuple, RuntimeContext ctx,
RequestIndexer indexer) {
try {
indexer.add(createIndexRequest(memberSummaryTuple));
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}
});
// set RestClientFactory to provide authentication
if(elasticPassword != null) {
memberEsSinkBuilder.setRestClientFactory(restClientBuilder -> {
restClientBuilder.setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
@Override
public HttpAsyncClientBuilder customizeHttpClient(HttpAsyncClientBuilder httpClientBuilder) {
// elasticsearch username and password
CredentialsProvider credentialsProvider = new BasicCredentialsProvider();
credentialsProvider.setCredentials(AuthScope.ANY,
new UsernamePasswordCredentials("elastic", elasticPassword));
return httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider);
}
});
});
}
// set number of events to be seen before writing to Elasticsearch
memberEsSinkBuilder.setBulkFlushMaxActions(1);
// handle failing elasticsearch requests
memberEsSinkBuilder.setFailureHandler(new ActionRequestFailureHandler() {
@Override
public void onFailure(ActionRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer)
throws Throwable {
if (ExceptionUtils.findThrowable(failure, EsRejectedExecutionException.class).isPresent()) {
// full queue; re-add document for indexing
indexer.add(action);
} else if (ExceptionUtils.findThrowable(failure, ElasticsearchParseException.class).isPresent()) {
// malformed document; simply drop request without failing sink
} else {
// for all other failures, fail the sink
// here the failure is simply rethrown, but users can also choose to throw
// custom exceptions
throw failure;
}
}
});
}
public ElasticsearchSink.Builder<Tuple4<String, Integer, Integer, Integer>> getSinkBuilder() {
return memberEsSinkBuilder;
}
}

没有立即解决的办法。Flink的Elasticsearch连接器使用RestHighLevelClient。兼容性模式只是在7.17版本中添加的,但是Elastic已经将该客户端重新授权到不兼容的SSPL许可证。这将阻止当前Flink实现的升级。您可以跟踪https://issues.apache.org/jira/browse/FLINK-26088以获得对Elasticsearch 8的适当支持,尽管还没有志愿者选择它。

最新更新