我使用一个简单的文件源读取器
connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
tasks.max=1
文件内容是每行中的简单 JSON 对象。我发现有一种方法可以替换记录键并使用转换来执行此操作,例如
# Add the `id` field as the key using Simple Message Transformations
transforms=InsertKey
# `ValueToKey`: push an object of one of the column fields (`id`) into the key
transforms.InsertKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.InsertKey.fields=ip
但是我遇到了一个错误
仅支持 [将字段从值复制到键] 的结构对象, 找到: java.lang.String
有没有办法解析字符串 json 并从那里获取密钥,就像我可以对 Flume 和 regex_extractor 所做的那样?
在 SourceConnector 上使用转换时,转换是在SourceConnector.poll()
返回的List<SourceRecord>
上完成
的在您的情况下,FileStreamSourceConnector
读取文件的行,并将每一行作为 String 放入SourceRecord
对象中。因此,当变换得到SourceRecord
时,它只将其视为字符串,而不知道对象的结构。
为了解决这个问题,
- 您可以修改
FileStreamSourceConnector
代码,以便它返回具有输入 json 字符串的有效结构和架构的SourceRecord
。为此,您可以使用 Kafka 的 SchemaBuilder 类。
或者,如果在接收器连接器 - 中使用此数据,可以通过在接收器连接器上设置以下配置来让 KafkaConnect 将其转换为 JSON,然后在接收器连接器上执行转换。
"value.converter":"org.apache.kafka.connect.json.JsonConverter"value.converter.schemas.enable": "false">
如果您使用第二个选项,请不要忘记将这些配置放在您的 SourceConnector 上。
"value.convertor":"org.apache.kafka.connect.storage.StringConverter"value.converter.schemas.enable": "false">
一种方法可以替换记录键
有一个单独的转换称为org.apache.kafka.connect.transforms.ReplaceField$Key
InsertKey
将获取一个值并尝试插入到结构/映射中,但您似乎正在使用字符串键