Kafka Streams Serdes 具有嵌套泛型不起作用



我有以下代码,它使用函数样式为kafka主题定义两个函数


@Bean
public Function<KStream<String, CloudEvent<ClassA>>, KStream<String, CloudEvent<ClassB>>> method1() {
....... //lambda
}
@Bean
public Function<KStream<String, CloudEvent<ClassB>>, KStream<String, CloudEvent<ClassC>>> method2() {
...... //lambda
}

对于这两个函数,我定义了serdes,所以

@Bean
public Serde<CloudEventMessage<ClassA>> classASerde(ObjectMapper mapper, Validator validator) {     
return StreamsSerdes.classASerde(mapper,validator);
}
@Bean
public Serde<CloudEventMessage<ClassB>> classBSerde(ObjectMapper mapper, Validator validator) {     
return StreamsSerdes.classBSerde(mapper,validator);
}

这种构造不起作用,因为在运行时spring试图用CloutEvent<ClassA>的Serde反序列化CloudEvent<ClassB>有什么方法可以提示使用方法1和方法2的正确序列号吗

其次,我可以通过在application.properties 中提及Serdes来绕过上述问题

spring.application.cloud.stream.kafka.streams.bindings.method1-in-0.consumer.valueSerde=package.serde.StreamsSerdes$ClassASerde
spring.application.cloud.stream.kafka.streams.bindings.method2-in-0.consumer.valueSerde=package.serde.StreamsSerdes$ClassBSerde

然而,现在我遇到了其他问题,因为这些Serde类没有默认的构造函数。我确实需要ObjectMapper,Spring的Validator来注入bean(@Service(,以便在反序列化过程中执行转换/验证。

有没有人遇到过类似的问题,或者可能有解决问题的想法?

感谢

我认为嵌套泛型现在在绑定器中不起作用是一个缺口。您介意在存储库中创建一个问题并链接此线程吗?

至于在application.properties中提供属性时遇到的第二个问题,可以尝试使用变通方法。Serde接口有一个采用映射的configure方法。

default void configure(Map<String, ?> configs, boolean isKey) {
// intentionally left blank
}

Serde实现中重写这个方法,并在一些键下设置这些bean对象。

ObjectMapper mapper;
Validator validator;

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
this.mapper = (ObjectMapper) configs.get("mapper.key");
this.validator = (Validator) configs.get("validator.key");
}

您需要从构造函数中删除对它们的访问,并将这些字段直接用于反序列化和序列化。

然后在应用程序中提供这个bean来填充映射:

@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer(ObjectMapper mapper, Validator validator) {
return factoryBean -> {
factoryBean.getStreamsConfiguration().put("mappeer.key", mapper);
factoryBean.getStreamsConfiguration().put("validator.key", validator);
};
}

我还没有在应用程序中尝试过这段代码,但你可以尝试一下,看看它是否能与你的代码一起使用。

最新更新