卡夫卡连接转换.解析输入字符串并获取记录键



我使用一个简单的文件源读取器

connector.class=org.apache.kafka.connect.file.FileStreamSourceConnector
tasks.max=1

文件内容是每行中的简单 JSON 对象。我发现有一种方法可以替换记录键并使用转换来执行此操作,例如

# Add the `id` field as the key using Simple Message Transformations
transforms=InsertKey
# `ValueToKey`: push an object of one of the column fields (`id`) into the key
transforms.InsertKey.type=org.apache.kafka.connect.transforms.ValueToKey
transforms.InsertKey.fields=ip

但是我遇到了一个错误

仅支持 [将字段从值复制到键] 的结构对象, 找到: java.lang.String

有没有办法解析字符串 json 并从那里获取密钥,就像我可以对 Flume 和 regex_extractor 所做的那样?

在 SourceConnector 上使用转换时,转换是在SourceConnector.poll()返回的List<SourceRecord>上完成

的在您的情况下,FileStreamSourceConnector读取文件的行,并将每一行作为 String 放入SourceRecord对象中。因此,当变换得到SourceRecord时,它只将其视为字符串,而不知道对象的结构。

为了解决这个问题,

  1. 您可以修改FileStreamSourceConnector代码,以便它返回具有输入 json 字符串的有效结构和架构的SourceRecord。为此,您可以使用 Kafka 的 SchemaBuilder 类。
  2. 或者,如果在接收器连接器
  3. 中使用此数据,可以通过在接收器连接器上设置以下配置来让 KafkaConnect 将其转换为 JSON,然后在接收器连接器上执行转换。

"value.converter":"org.apache.kafka.connect.json.JsonConverter"value.converter.schemas.enable": "false">

如果您使用第二个选项,请不要忘记将这些配置放在您的 SourceConnector 上。

"value.convertor":"org.apache.kafka.connect.storage.StringConverter"value.converter.schemas.enable": "false">

一种方法可以替换记录键

有一个单独的转换称为org.apache.kafka.connect.transforms.ReplaceField$Key

InsertKey将获取一个值并尝试插入到结构/映射中,但您似乎正在使用字符串键

相关内容

  • 没有找到相关文章

最新更新