我正在使用 kafka connect elasticsearch 连接器将数据从主题写入 ElasticSearch 索引。主题消息的键和值均采用 json 格式。由于以下错误,连接器无法启动:
org.apache.kafka.connect.errors.DataException: MAP is not supported as the document id.
以下是我的消息格式(键|值(:
{"key":"OKOK","start":1517241690000,"end":1517241695000} | {"measurement":"responses","count":9,"sum":1350.0,"max":150.0,"min":150.0,"avg":150.0}
以下是我用于创建连接器的 POST 请求的正文:
{
"name": "elasticsearch-sink-connector",
"config": {
"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "output-topic-elastic",
"connection.url": "http://elasticsearch:9200",
"type.name": "aggregator",
"schemas.enable": "false",
"topic.schema.ignore": "true",
"topic.key.ignore": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"key.ignore":"false",
"topic.index.map": "output-topic-elastic:aggregator",
"name": "elasticsearch-sink",
"transforms": "InsertKey",
"transforms.InsertKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.InsertKey.fields":"key"
}}
任何帮助将不胜感激。我在堆栈溢出 1 上发现了一个类似的问题,但我对答案没有运气。
创建 ES 文档 ID
你还需要ExtractField
在那里
"transforms": "InsertKey,extractKey",
"transforms.InsertKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.InsertKey.fields":"key",
"transforms.extractKey.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractKey.field":"key"
查看这篇文章以获取更多详细信息。