ElasticsearchSinkConnector 对象映射无法从嵌套更改为非嵌套



我正在将我的数据从 kafka 主题流式传输到 elasticsearch。但它从连接器{"type":"illegal_argument_exception","reason":"object mapping [search_data] can't be changed from nested to non-nested"}抛出此错误

但是当我从主题中获取消息并使用 elasticsearch api 手动添加文档时,它工作正常。

kafka-connect-elasticsearch不支持嵌套对象类型吗?

请帮我回复这个问题,因为我被困在这里好几天了。

弹性搜索版本:7.6.2

Kafka Connect Image: confluentinc/cp-kafka-connect:5.4.2

以下是我对连接器的配置。

{
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "es_sink_products",
"key.ignore": "false",
"schema.ignore": "true",
"connection.url": "localhost:9200",
"type.name": "kafka-connect",
"name": "product-elasticsearch-sink",
"key.converter": "org.apache.kafka.connect.storage.StringConverter"
}

弹性搜索模式

{
"mappings": {
"properties": {
"search_result_data": {
"properties": {
"product_id": {"type": "long"},
"product_name": {"type": "text"},
}
},
"search_data":{
"type": "nested",
"include_in_parent": false,
"properties": {
"product_id": {"type": "long"},
"full_text": {
"type": "text",
},
}
}
}
}
}

来自主题es_sink_products的示例消息

{
"search_data": {
"product_id": 1,
"full_text": "Product 1"
},
"search_result_data": {
"product_id": 1,
"product_name": "Product Name 1"
}
}

这是来自连接器的完整错误 "org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.ntat org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:561)ntat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)ntat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)ntat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)ntat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)ntat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)ntat java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)ntat java.util.concurrent.FutureTask.run(FutureTask.java:266)ntat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)ntat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)ntat java.lang.Thread.run(Thread.java:748)nCaused by: org.apache.kafka.connect.errors.ConnectException: Bulk request failed: [{"type":"illegal_argument_exception","reason":"object mapping [search_data] can't be changed from nested to non-nested"}]ntat io.confluent.connect.elasticsearch.bulk.BulkProcessor$BulkTask.handleMalformedDoc(BulkProcessor.java:479)ntat io.confluent.connect.elasticsearch.bulk.BulkProcessor$BulkTask.execute(BulkProcessor.java:433)ntat io.confluent.connect.elasticsearch.bulk.BulkProcessor$BulkTask.call(BulkProcessor.java:389)ntat io.confluent.connect.elasticsearch.bulk.BulkProcessor$BulkTask.call(BulkProcessor.java:375)ntat java.util.concurrent.FutureTask.run(FutureTask.java:266)ntat java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)ntat java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)ntat java.lang.Thread.run(Thread.java:748)ntat io.confluent.connect.elasticsearch.bulk.BulkProcessor$BulkProcessorThread.run(BulkProcessor.java:370)n"

当之前设置了非嵌套映射并且您尝试使用嵌套类型更新该映射时,会引发此错误。

您现在可以做的是:

  1. 删除索引
  2. 设置一次nested映射(您在上面称为Elasticsearch Schema映射(
  3. 使用选项"schema.ignore": "false"启动 kafka 流

原因:将非嵌套更改为nested被视为"重大更改",因为有效负载的索引方式。

问题出在kafka 连接器配置中的type.name。Elasticsearch 默认类型是_doc。由于我使用kafka-connect作为类型,因此 elasticsearch 假设我想添加另一种文档类型并与现有的_doc嵌套类型冲突。

通过更改连接器配置的"type.name": "_doc"解决了该问题。

相关内容

  • 没有找到相关文章

最新更新