我使用Kstream来使用kafka消息并将其持久化到我的数据库中。这些消息属于不同的pojo,目前我正在使用对象映射器来创建对象,然后将它们保存在DB中。我读到Jsondeserialzersede可以使用,但我不确定如何使用它来映射到不同的pojo。为每个pojo定制serde是没有意义的。请帮忙。提前谢谢。
这是我的代码:
public Consumer<KStream<String, String>> process() {
return input ->
inpu.foreach((key, value) -> {
ObjectMapper mapper = new ObjectMapper();
try {
if(value.contains("Teacher"))
{
Teacher teacher= mapper.readValue(value,Teacher.class);
teacherRepository.save(teacher);
}
else if(value.contains("Student"))
{
Student student= mapper.readValue(value,Student.class);
studentRepository.save(student)
}
else if(value.contains("Principal"))
{
Principal principal= mapper.readValue(value,Principal.class);
principalRepository.save(Principal);
}
} catch (JsonProcessingException e) {
e.printStackTrace();
}
}); }
}
Kafka Streams绑定器没有直接提供任何机制来执行您的要求。消费者的类型签名表明您将其作为String
使用,因此绑定器可以推断该信息,而无需显式提供任何Serde
。但是,如果您想使用jackson将String
进一步转换为其他类型,您需要在您的业务逻辑中自己完成,因为您已经拥有了它。如果你只有几个有限类型,我认为这样做没有问题