我正在为多个Spring Boot应用程序实现一个公共库,它将为Kafka提供一个简单的接口。我选择了Spring Cloud Stream进行Kafka集成。这个库必须支持多种不同的有效负载类型,但是我很难理解如何以干净和通用的方式完成此操作。
我知道我可以为每个有效负载类型实现多个@StreamListener
方法,例如:
@StreamListener(target = SinkBindings.INPUT, condition = "headers['X-Target-Type'] matches '.*\.Person'")
public void listenForMessage(Person payload) {
// do stuff with payload
}
但是,此解决方案无法针对许多不同的有效负载类型进行扩展。在理想的世界中,我不希望使用此公共库的应用程序甚至不知道 Kafka 侦听器方法。相反,我更愿意提供一个SPI,使用应用程序通过该SPI实现如下所示的接口,框架将调用实现Spring bean并向其传递正确的有效负载类型。
public interface MessageHandlingService<T extends BaseClass> {
void handleMessage(T payload);
}
对于每种有效负载类型,我希望避免实现单独的@StreamListener
方法,但由于 Java 的类型擦除,这似乎非常混乱。我想知道是否有一种现实的方法可以使用单个流侦听器方法将不同的有效负载对象路由到通用服务豆?
出现的另一个想法是在上面的接口中添加一个boolean support(Class<?> clazz)
方法,这将允许库检查哪个类支持有效负载,但这似乎有点"黑客"(???(。
传入消息的有效负载将精确转换为预期类型:没有办法对@StreamListener
执行其他操作。具有@StreamListener
的春云流方法取决于方法参数的预期类型。这是转换的信号。您无法将有效负载转换为类型,然后尝试仅使用@StreamListener
来确定要调用的方法。类级别的@KafkaListener
@KafkaHandler
方法可以在这里为您提供帮助:https://docs.spring.io/spring-kafka/docs/2.3.0.RC1/reference/html/#class-level-kafkalistener。
完全成熟的弹簧集成流程及其转换功能也可用于这样的逻辑。
有关内容类型协商的详细信息,请参阅文档:https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/2.2.1.RELEASE/spring-cloud-stream.html#content-type-management