我使用Spring KafkaStreamBuilderFactoryBean
配置Kafka Streams,创建嵌入Spring Boot应用程序中的Kafka Stream应用程序。然后,我自动连接StreamsBuilder
以设置Kafka Streams拓扑:
@Bean("streamsBuilder")
public StreamsBuilderFactoryBean streamBuilderFactory() {
Map<String, Object> config = createConfig();
CleanupConfig cleanupConfig = new CleanupConfig(false, false);
KafkaStreamsConfiguration kafkaStreamsConfiguration = new KafkaStreamsConfiguration(config);
return new StreamsBuilderFactoryBean(kafkaStreamsConfiguration, cleanupConfig);
}
和
@Bean("streamTopology")
public KStream<Key, Value> configureTopology(
@Qualifier("streamsBuilder") StreamsBuilder builder) {
// set up and return topology
}
对于使用一个或多个GlobalKTables的带有嵌入式Kafka Streams应用程序的应用程序,在关闭时会始终生成以下错误日志:
stream-client [xxx] Global thread has died. The streams application or client will now close to ERROR.
Spring似乎没有在应用程序关闭时正常停止Streams应用程序。是否缺少某些配置或关闭挂钩以防止这种情况发生?我的印象是,春季卡夫卡将处理优雅的关闭。
提前感谢您的帮助!
正如Mathias Sax在Slack上回答的那样,这是Kafka 2.8.0:的一个已知错误
请参阅https://issues.apache.org/jira/browse/KAFKA-13423以及相应的Pull Requesthttps://github.com/apache/kafka/pull/11455