Kafka Connect API and Avro Object (SourceRecord vs org.apach



我在使用 kafka源连接器将 Avro 对象(org.apache.avro.specific.SpecificRecord 的实例(发送到 kafka 主题时遇到问题(需要准备 SourceRecord 实例(。 就我而言,我假设基于模式,例如:

{
"namespace": "com.model.avro.generated",
"type": "record",
"name": " MessageExVal",
"version": "1",
"fields": [
{
"name": "messageSource",
"type": "string"
},
{
"name": "messageSourceVersion",
"type": [
"string",
"null"
]
}
]
}

在 Mavenavro-maven-plugin的帮助下,我将生成项目中使用的类模型。 类MessageExVal的实例为我提供了"org.apache.avro.Schema"(通过方法getSchema() or getClassSchema()(。从第二端 kafka 连接 api 要求我org.apache.kafka.connect.data.Schema能够创建由源连接器方法返回poll()SourceRecord的新实例。 在配置中,我提供了参数:

"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",

在"poll"方法之后执行的方法fromConnectData()AvroConverter代码中,我看到将从org.apache.kafka.connect.data.Schema转换为org.apache.avro.Schema。那么是否有任何选项可以在不首先将其转换为"连接版本"的情况下传递 avro 模式,因为稍后无论如何它都会转换回 avro ? 您可以在下面找到我所指的代码中带有注释点的轮询方法的实现:

@Override
public List<SourceRecord> poll() throws InterruptedException {
List<SourceRecord> records = new LinkedList<SourceRecord>();
MessageExVal myValue = MessageExVal.newBuilder()
.setMessageType(“some value”)
.setMessageSource(“some other value”)
.build();
SourceRecord sr = new SourceRecord(null, null,
"test_topic",
myValue.getSchema(), //incorrect - different types
myValue);
records.add(sr);
return records;
}

总结一切,我的问题是如何使用kafka连接SourceConnector将"myValue"放入主题中?我将非常感谢每一个提示:)

因为后来它还是被转换回了 avro ?

数据以二进制形式存储在主题中,因此您仍然需要支付反序列化成本

kafka connect api 需要我 org.apache.kafka.connect.data.Schema 才能创建 SourceRecord 的新实例

是的。您可以使用toConnectData来获取它,也可以直接从代码的依赖项中删除 Avro 并直接从 Connect 创建架构和结构实例。

转换器负责序列化,连接中不需要 Avro

最新更新