我在创建弹性连接器时出现以下错误。
使用创建源连接器testdemosinkconnector
("type.name"="_doc',"input.data.format"="AVRO’,"connector.class"="io.confluent.connect.eelasticsearch.EelasticsearchSinkConnector',"tasks.max1’,"转换"="经销商’,"主题e.contact.model',"转换。经销商类型io.confluent.connect.transforms.ExtractTopic$Value',"转换。Dealership.field"="indexTopicName',"转换。经销商.skip.missing.or.null"='true’,"connection.url"='https://elasticsearchdemo.es.us-central1.gcp.cloud.es.io:9243',"connection.username"="弹性’,"connection.password"="BUgBxOBg3dv4jp4Z3W7p4tHC’,"key.ignore"='true’,"值转换器io.confluent.connect.avro.AvroConverter',"value.coverter.schemas.enable"="true’,"value.coverter.schema.registry.url"='http://localhost:8081',"bulk.size.bytes-1’,"behavior.on.null.values"="忽略',enter code here
"behavior.on.formable.documents"="忽略',"max.retrys5’,"retry.backoff.ms"="5000英尺);
错误是,
FAILED | org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:618)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:334)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:235)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:204)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:200)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:255)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.ConnectException: Bulk request failed
at io.confluent.connect.elasticsearch.ElasticsearchClient$1.afterBulk(ElasticsearchClient.java:397)
at org.elasticsearch.action.bulk.BulkRequestHandler$1.onFailure(BulkRequestHandler.java:70)
at org.elasticsearch.action.ActionListener$5.onFailure(ActionListener.java:258)
at org.elasticsearch.action.bulk.Retry$RetryHandler.onFailure(Retry.java:126)
at io.confluent.connect.elasticsearch.ElasticsearchClient.lambda$null$1(ElasticsearchClient.java:174)
... 5 more
Caused by: org.apache.kafka.connect.errors.ConnectException: Failed to execute bulk request due to 'java.io.IOException: Unable to parse response body for Response{requestLine=POST /_bulk?timeout=1m HTTP/1.1, host=https://elasticsearchdemo.es.us-central1.gcp.cloud.es.io:9243, response=HTTP/1.1 200 OK}' after 6 attempt(s)
at io.confluent.connect.elasticsearch.RetryUtil.callWithRetries(RetryUtil.java:165)
at io.confluent.connect.elasticsearch.RetryUtil.callWithRetries(RetryUtil.java:119)
at io.confluent.connect.elasticsearch.ElasticsearchClient.callWithRetries(ElasticsearchClient.java:425)
at io.confluent.connect.elasticsearch.ElasticsearchClient.lambda$null$1(ElasticsearchClient.java:168)
... 5 more
Caused by: java.io.IOException: Unable to parse response body for Response{requestLine=POST /_bulk?timeout=1m HTTP/1.1, host=https://elasticsearchdemo.es.us-central1.gcp.cloud.es.io:9243, response=HTTP/1.1 200 OK}
at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1632)
at org.elasticsearch.client.RestHighLevelClient.performRequest(RestHighLevelClient.java:1583)
at org.elasticsearch.client.RestHighLevelClient.performRequestAndParseEntity(RestHighLevelClient.java:1553)
at org.elasticsearch.client.RestHighLevelClient.bulk(RestHighLevelClient.java:533)
at io.confluent.connect.elasticsearch.ElasticsearchClient.lambda$null$0(ElasticsearchClient.java:170)
at io.confluent.connect.elasticsearch.RetryUtil.callWithRetries(RetryUtil.java:158)
... 8 more
Caused by: java.lang.NullPointerException
at java.base/java.util.Objects.requireNonNull(Objects.java:221)
at org.elasticsearch.action.DocWriteResponse.<init>(DocWriteResponse.java:127)
at org.elasticsearch.action.index.IndexResponse.<init>(IndexResponse.java:54)
at org.elasticsearch.action.index.IndexResponse.<init>(IndexResponse.java:39)
at org.elasticsearch.action.index.IndexResponse$Builder.build(IndexResponse.java:107)
at org.elasticsearch.action.index.IndexResponse$Builder.build(IndexResponse.java:104)
at org.elasticsearch.action.bulk.BulkItemResponse.fromXContent(BulkItemResponse.java:159)
at org.elasticsearch.action.bulk.BulkResponse.fromXContent(BulkResponse.java:196)
at org.elasticsearch.client.RestHighLevelClient.parseEntity(RestHighLevelClient.java:1892)
at org.elasticsearch.client.RestHighLevelClient.lambda$performRequestAndParseEntity$8(RestHighLevelClient.java:1554)
at org.elasticsearch.client.RestHighLevelClient.internalPerformRequest(RestHighLevelClient.java:1630)
13 more
请帮助我解决此错误。弹性接收器连接器版本:11.1.10弹性搜索版本:8.2.2
11.0.0版本起,连接器使用Elasticsearch高级REST客户端(7.0.1版本),这意味着仅支持Elasticsearch 7.x。
https://docs.confluent.io/kafka-connect-elasticsearch/current/overview.html