情况如下。
我们在 Kafka Broker 中设置了 SSL + ACL。
我们正在设置流,它从两个主题读取消息:
KStream<String, String> stringInput
= kBuilder.stream( STRING_SERDE, STRING_SERDE, inTopicName );
stringInput
.filter( streamFilter::passOrFilterMessages )
.map( processor )
.to( outTopicName );
它像两次一样完成(在循环中)。 然后我们设置通用错误处理程序:
streams.setUncaughtExceptionHandler( ( Thread t, Throwable e ) -> {
synchronized ( this ) {
LOG.fatal( ... );
this.stop();
}
}
);
问题如下。例如,如果在一个主题中证书不再有效。流正在引发异常 无权访问主题... 目前为止,一切都好。
但是异常由常规错误处理程序处理,因此即使第二个主题没有问题,整个应用程序也会停止。
问题是,如何处理每个主题的此异常? 如何避免由于单个主题存在授权问题而导致在某个时刻完成应用程序停止的情况?
我了解,如果经纪人不可用,那么完整的应用程序可能会停止。但是,如果只有一个主题不可用,那么单个流应该停止,并且没有完成应用程序,或者?
根据设计,Kafka Streams 将拓扑视为一个拓扑,无法区分这两个部分。对于您的特定情况,当您循环并构建到独立的管道时,您可以并行运行两个KafkaStreams
实例(在同一应用程序/JVM 中)以将两者彼此隔离。因此,如果一个失败,另一个不受影响。您需要对这两个实例使用两种不同的application.id
。