将Kafka connect Struct转换为POJO



我正在开发一个组件,该组件从数据库中捕获数据更改(CDC(,对数据应用一些业务逻辑,然后将它们发送到一个新的主题,即azure事件中心主题。

我的堆栈包括:

  • 骆驼式管道
  • Debezium嵌入式(用于CDC(

debezium的这篇文章是我开始工作的基础。转换器用于将Struct转换为POJO。

@Converter
public static class Converters {
@Converter
public static Question questionFromStruct(Struct struct) {                   
return new Question(struct.getInt64("id"), struct.getString("text"),
struct.getString("email"));
}
@Converter
public static Answer answerFromStruct(Struct struct) {                       
return new Answer(struct.getInt64("id"), struct.getString("text"),
struct.getString("email"), struct.getInt64("question_id"));
}
}

在我的案例中,我处理的POJO是Avro生成的。此外,它还包括20多个我不想手动设置的属性。

我想到了MapStruct或Dozer之类的映射器。但它们不处理这种映射。

关于如何更自动化地处理struct,有什么见解吗?

如Debezium Engine文档中所写

在内部,引擎使用适当的Kafka Connect转换器实现,将转换委托给

这意味着,您需要反转Converter方法,然后使用Kafka反序列化程序类将数据返回到最初生成的POJO。


假设您使用了Confluent的AvroConverter,那么您应该能够使用AvroConverter#fromConnectData返回一个byte[]作为键/值。不过,您的Struct首先需要ConnectSchema实例。

由此,您可以将字节传递给KafkaAvroDeserializer#deserialize,并将响应强制转换为生成的类。


或者,在Debezium/Kafka Connect中转换数据的正确方法是使用简单消息转换并保留在Struct/Schema API中。

相关内容

  • 没有找到相关文章

最新更新