我尝试使用 elasticsearch(版本为 6.0.0(sink 构建一个 flink 流字数统计演示。 不幸的是得到以下错误。 这似乎是依赖性冲突。
Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: java.lang.NoSuchMethodError: org.elasticsearch.action.bulk.BulkProcessor.builder(Ljava/util/function/BiConsumer;Lorg/elasticsearch/action/bulk/BulkProcessor$Listener;)Lorg/elasticsearch/action/bulk/BulkProcessor$Builder;
at org.apache.flink.runtime.minicluster.MiniCluster.executeJobBlocking(MiniCluster.java:623)
at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment.java:123)
at com.quvideo.xiaoying.flink.elasticsearch.WordCountSinkElasticsearch.main(WordCountSinkElasticsearch.java:68)
Caused by: java.lang.NoSuchMethodError: org.elasticsearch.action.bulk.BulkProcessor.builder(Ljava/util/function/BiConsumer;Lorg/elasticsearch/action/bulk/BulkProcessor$Listener;)Lorg/elasticsearch/action/bulk/BulkProcessor$Builder;
at org.apache.flink.streaming.connectors.elasticsearch6.Elasticsearch6ApiCallBridge.createBulkProcessorBuilder(Elasticsearch6ApiCallBridge.java:92)
at org.apache.flink.streaming.connectors.elasticsearch6.Elasticsearch6ApiCallBridge.createBulkProcessorBuilder(Elasticsearch6ApiCallBridge.java:45)
at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.buildBulkProcessor(ElasticsearchSinkBase.java:353)
at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase.open(ElasticsearchSinkBase.java:297)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Process finished with exit code 1
我的 elasticsearch 集群是 6.0.0,flink 依赖
如下<properties>
<flink.version>1.6.0</flink.version>
<elastic>6.0.0</elastic>
</properties>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-elasticsearch6_2.11</artifactId>
<version>${flink.version}</version>
</dependency>
Elasticsearch 相关依赖
关系如下:<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>${elastic}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-client</artifactId>
<version>${elastic}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>${elastic}</version>
</dependency>
以及与弹性相关的代码:
public static ElasticsearchSink<WordWithCount> getEsSink(){
List<HttpHost> httpHosts = new ArrayList<>();
httpHosts.add(new HttpHost("10.0.35.148", 9200, "http"));
ElasticsearchSink.Builder<WordWithCount> esSinkBuilder = new ElasticsearchSink.Builder<>(
httpHosts,
new ElasticsearchSinkFunction<WordWithCount>() {
public IndexRequest createIndexRequest(WordWithCount element) {
Map<String, Object> json = new HashMap<>();
json.put("word", element.word);
json.put("count", element.count);
return Requests.indexRequest()
.index("wordcount_idx")
.type("test_type")
.source(json);
}
@Override
public void process(WordWithCount element, RuntimeContext ctx, RequestIndexer indexer) {
indexer.add(createIndexRequest(element));
}
}
);
esSinkBuilder.setBulkFlushMaxActions(1);
return esSinkBuilder.build();
}
有关详细信息,此错误是在Elasticsearch6ApiCallBridge的方法中触发的.java
@Override public BulkProcessor.Builder createBulkProcessorBuilder(RestHighLevelClient client, BulkProcessor.Listener listener( { 返回 BulkProcessor.builder(client::bulkAsync, listener(;//错误消息"客户端不是功能接口"。 }
谢谢
我提交了 https://issues.apache.org/jira/browse/FLINK-10173。作为解决方法,您只需添加:
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>6.3.1</version>
</dependency>