Kafka/Confluent CSV/SFTP连接器和嵌套json



我正在尝试读取csv文件并将其作为json发送到kafka主题。我从Confluent平台的站点CSV源连接器的连接器示例开始,我使用Confluent本地安装。使用一个基本的例子,它工作得很好,我的csv样本数据是:

name,street,city
Homer Simpson,742 Evergreen Terrace,Springfield

和我能够读取json从主题像这样:

{
"name": "Homer Simpson",
"street": "742 Evergreen Terrace",
"number": "Springfield"
}

现在,我需要将这行csv转换成json:

{
"name": "Homer Simpson",
"address": {
"street": "742 Evergreen Terrace",
"number": "Springfield"
}
}

下面是我使用的数据源连接器:

{
"name": "NestedExample",
"config": {
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"tasks.max": 1,
"connector.class": "io.confluent.connect.sftp.SftpCsvSourceConnector",
"kafka.topic": "nested-topic",
"cleanup.policy": "NONE",
"behavior.on.error": "IGNORE",
"input.path": "/home/project/opt/confluent-6.2.0/sftp/data",
"error.path": "/home/project/opt/confluent-6.2.0/sftp/error",
"finished.path": "/home/project/opt/confluent-6.2.0/sftp/finished",
"input.file.pattern": ".*.csv",
"sftp.username": "user",
"sftp.password": "password",
"sftp.host": "10.254.1.6",
"sftp.port": "22",
"csv.ignore.leading.whitespace": "true",
"csv.first.row.as.header": "false",
"csv.skip.lines": 1,
"key.schema": "{"name" : "com.example.users.UserKey","type" : "STRUCT","isOptional" : true,"fieldSchemas" : {"material" : {"type" : "STRING","isOptional" : true}}}",
"value.schema": "{ "name" : "com.example.users.User", "type" : "STRUCT", "isOptional" : false, "fieldSchemas" : { "name" : { "isOptional" : false, "type" : "STRING" }, "street" : { "isOptional" : false, "type" : "STRING" }, "number" : { "isOptional" : false, "type" : "STRING" } } }"
}
}

这里是我的"value。schema"格式化可读性:

{
"name" : "com.example.users.User",
"type" : "STRUCT",
"isOptional" : false,
"fieldSchemas" : {
"name" : {
"isOptional" : false,
"type" : "STRING"
},
"street" : {
"isOptional" : false,
"type" : "STRING"
},
"number" : {
"isOptional" : false,
"type" : "STRING"
}
}
}

您需要编写单个消息转换,以便一次修改多个字段

或者,在使用Connect

之后,使用ksqlDB或Kafka Streams将数据映射到您期望的输出格式。

相关内容

  • 没有找到相关文章

最新更新