@StreamListener的自动启动



@KafkaListener不同,@StreamListener似乎不支持autoStartup参数。有没有一种方法可以实现@StreamListener的相同行为?这是我的用例:

我有一个通用的Spring应用程序,它可以监听任何Kafka主题,并写入数据库中相应的表。对于某些主题,音量很低,因此可以用很低的延迟处理单个消息。对于其他高容量的主题,代码应该接收一个消息的微批,并使用Jdbc批以较少的频率写入数据库。理想情况下,听众的定义应该是这样的:

// low volume listener
@StreamListener(target = Sink.INPUT, autoStartup="${application.singleMessageListenerEnabled}")
public void handleSingleMessage(@Payload GenericRecord message) ...
// high volume listener
@StreamListener(target = Sink.INPUT, autoStartup="${application.multipleMessageListenerEnabled}")
public void handleMultipleMessages(@Payload List<GenericRecord> messageList) ...

对于低音量主题,我会将application.singleMessageListenerEnabled设置为true,将application.multipleMessageListenerEnabled设置为false;对于高音量主题,反之亦然。因此,只有一个侦听器会主动侦听消息,而另一个不会主动侦听。

有没有办法用@StreamListener实现这一点?

首先,请考虑升级到函数式编程模型,这需要几分钟的重构时间。我们几乎都反对基于注释的编程模型。如果你这样做了,那么你试图完成的事情就很容易了:

@SpringBootApplication
public class SimpleStreamApplication {
public static void main(String[] args) throws Exception {
SpringApplication.run(SimpleStreamApplication.class);
}
@Bean
public Consumer<GenericRecord> singleRecordConsumer() {...}
@Bean
public Consumer<List<GenericRecord>> multipleRecordConsumer() {...}
}

然后,您可以简单地对单个案例使用--spring.cloud.function.definition=singleRecordConsumer属性,并在启动应用程序时使用--spring.cloud.function.definition=multipleRecordConsumer,这样可以确保要激活哪个特定侦听器。

最新更新