我问这个问题是因为在原始情况下没有答案:弹性卡夫卡连接器,ID创建。
我也有类似的情况。
弹性搜索表,用于为单个字段创建记录,但在通过 kafkaconnect 发送请求时不为多个字段创建记录。
在弹性搜索中出现异常"键用作文档 ID,不能为 null"。
我的连接器配置:
{
"name": "test-connector33",
"config": {
"connector.class":"io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "test-connector33",
"connection.url": "http://localhost:9200",
"type.name": "aggregator",
"schema.ignore": "true",
"topic.schema.ignore": "true",
"topic.key.ignore": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"key.ignore":"false",
"name": "test-connector33",
"transforms": "InsertKey,extractKey",
"transforms.InsertKey.type":"org.apache.kafka.connect.transforms.ValueToKey",
"transforms.InsertKey.fields":"customerId,city",
"transforms.extractKey.type":"org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractKey.field":"customerId,city"
}}
知道如何解决这个问题吗?
提前感谢!
org.apache.kafka.connect.transforms.ExtractField$Key
仅支持单个字段。
假设您的 JSON 对象是HashMap<String, Object>
。你找不到字段customerId,city
,因此map.get(field)
操作返回null
,因此将字段设置为 null。
如果要通过控制台生成器发送密钥,欢迎通过添加--property print.key=true
作为标志,然后键入密钥,按 tab 键,然后输入值来执行此操作。如果要将数据回显到进程中,还可以为垂直条设置--property key.separator='|'
,以及添加--property parse.key=true