如何在Kafka connect SMT中重命名/替换结构中的字段



replaceField SMT的描述说它可以Filter or rename fields within a Struct or Map.但是我找不到任何替换或重命名结构中字段的工作示例。

我有一个主题中的数据正在使用Kafka Connect ElasticSearch Sink写入ElasticSearch。为了简单起见,假设数据的格式如下所示。

{
'ID':22, 
'ITEM': 'Shampoo'
'USER':{
'NAME': 'jon', 
'AGE':25
}
}

因此,如果我试图重命名/替换USER.NAMEUSER.AGE,我将如何在连接器中配置它?(我已经在ksqldb中编写了所有内容(。这是我当前的配置,我将ITEM重命名为product,将ID重命名为id

CREATE SINK CONNECTOR ELASTIC_SINK WITH (
'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'connection.url' = 'http://host.docker.internal:9200',
'type.name' = '_doc',
'topics' = 'ELASTIC_TOPIC',
'key.ignore' = 'false',
'schema.ignore' = 'true',
'transforms' = 'RenameField',
'transforms.RenameField.type' = 'org.apache.kafka.connect.transforms.ReplaceField$Value',
'transforms.RenameField.renames' = 'ITEM:product,ID:id',
);

查看现有SO问答:https://stackoverflow.com/a/56601093/4778022

可以提供要重命名的字段的路径,其中各部分用句点分隔。

CREATE SINK CONNECTOR ELASTIC_SINK WITH (
'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
'connection.url' = 'http://host.docker.internal:9200',
'type.name' = '_doc',
'topics' = 'ELASTIC_TOPIC',
'key.ignore' = 'false',
'schema.ignore' = 'true',
'transforms' = 'RenameField',
'transforms.RenameField.type' = 'org.apache.kafka.connect.transforms.ReplaceField$Value',
'transforms.RenameField.renames' = 'USER.NAME:name,ITEM:product,ID:id',
);

最新更新