我正在开发一个组件,该组件从数据库中捕获数据更改(CDC(,对数据应用一些业务逻辑,然后将它们发送到一个新的主题,即azure事件中心主题。
我的堆栈包括:
- 骆驼式管道
- Debezium嵌入式(用于CDC(
debezium的这篇文章是我开始工作的基础。转换器用于将Struct转换为POJO。
@Converter
public static class Converters {
@Converter
public static Question questionFromStruct(Struct struct) {
return new Question(struct.getInt64("id"), struct.getString("text"),
struct.getString("email"));
}
@Converter
public static Answer answerFromStruct(Struct struct) {
return new Answer(struct.getInt64("id"), struct.getString("text"),
struct.getString("email"), struct.getInt64("question_id"));
}
}
在我的案例中,我处理的POJO是Avro生成的。此外,它还包括20多个我不想手动设置的属性。
我想到了MapStruct或Dozer之类的映射器。但它们不处理这种映射。
关于如何更自动化地处理struct
,有什么见解吗?
如Debezium Engine文档中所写
在内部,引擎使用适当的Kafka Connect转换器实现,将转换委托给
这意味着,您需要反转Converter方法,然后使用Kafka反序列化程序类将数据返回到最初生成的POJO。
假设您使用了Confluent的AvroConverter,那么您应该能够使用AvroConverter#fromConnectData
返回一个byte[]
作为键/值。不过,您的Struct
首先需要ConnectSchema
实例。
由此,您可以将字节传递给KafkaAvroDeserializer#deserialize
,并将响应强制转换为生成的类。
或者,在Debezium/Kafka Connect中转换数据的正确方法是使用简单消息转换并保留在Struct/Schema API中。