Confluent Kafka connect ElasticSearch ID 文档创建



我正在使用 kafka connect elasticsearch 连接器将数据从主题写入 ElasticSearch 索引。主题消息的键和值均采用 json 格式。由于以下错误,连接器无法启动:

    org.apache.kafka.connect.errors.DataException: MAP is not supported as the document id.

以下是我的消息格式(键|值(:

    {"key":"OKOK","start":1517241690000,"end":1517241695000}     |   {"measurement":"responses","count":9,"sum":1350.0,"max":150.0,"min":150.0,"avg":150.0} 

以下是我用于创建连接器的 POST 请求的正文:

{
 "name": "elasticsearch-sink-connector",
 "config": {
 "connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
 "tasks.max": "1",
 "topics": "output-topic-elastic",
 "connection.url": "http://elasticsearch:9200",
 "type.name": "aggregator",
 "schemas.enable": "false",
 "topic.schema.ignore": "true",
 "topic.key.ignore": "false",
 "value.converter": "org.apache.kafka.connect.json.JsonConverter",
 "value.converter.schemas.enable": "false", 
 "key.converter": "org.apache.kafka.connect.json.JsonConverter",
 "key.converter.schemas.enable": "false", 
 "key.ignore":"false",
 "topic.index.map": "output-topic-elastic:aggregator",
 "name": "elasticsearch-sink",
 "transforms": "InsertKey",
"transforms.InsertKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.InsertKey.fields":"key"
}}

任何帮助将不胜感激。我在堆栈溢出 1 上发现了一个类似的问题,但我对答案没有运气。

创建 ES 文档 ID

你还需要ExtractField在那里

"transforms": "InsertKey,extractKey",
"transforms.InsertKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.InsertKey.fields":"key",
"transforms.extractKey.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractKey.field":"key"

查看这篇文章以获取更多详细信息。

最新更新