Spring Integration Kafka 3.0.1 -> 3.1.2 测试现在无法启动,如果启动时没有可用的代理



当一个项目从 Springboot 2.0 升级到 2.1 时,我们也spring-kafka-integration从 3.0.1 升级到 3.2.1。在这样做的过程中,由于以下原因,我们的测试现在都无法开始:

org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-1, groupId=x] Connection to node -1 could not be established. Broker may not be available.
org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-1, groupId=x] Connection to node -1 could not be established. Broker may not be available.
org.apache.kafka.clients.NetworkClient   : [Consumer clientId=consumer-1, groupId=x] Connection to node -1 could not be established. Broker may not be available.
...
org.springframework.context.ApplicationContextException: Failed to start bean 'eventInboundFlow.kafka:message-driven-channel-adapter#0'; nested exception is org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata

我们的构建机器没有在本地运行的 Kafka,使用EmbeddedKafkaBroker的测试使用自定义 JUnit5 扩展来实现,该扩展在测试之间停止/启动容器侦听器(同时将所有分区和主题查找为最新版本,以防止测试意外泄漏消息以破坏后续测试中的预期)。虽然比@DirtiesContext快得多,但它不会像@EmbddedKafka注释那样在配置过程中将自身注入上下文中。

在以前的版本中,这不是问题;我们会看到一些日志消息,说明在扩展配置和启动代理时无法连接,但随后一切都很好。

在新版本中,上下文无法完全启动(自定义扩展甚至没有机会运行)。查看属性,我可以看到的有关启动失败的唯一设置是spring.kafka.admin.fail-fast,但默认情况下为 FALSE,我们不会更改它。

将其与将项目本身作为 springboot 应用程序启动相比,我看到的第一个区别是容器在作为应用程序运行时在自己的线程中启动,但在作为测试运行时在main/Test Worker线程上启动。在以前的版本中,测试还在自己的线程中启动了容器。

了解为什么测试现在表现不同吗?或者是否有办法配置它以使它们脱离主线程?

将容器属性missingQueuesFatal设置为false

参见 Apacher Kafka 2.2 的春天。新增功能。

添加了一个新的容器属性(缺少主题致命)。有关更多信息,请参阅使用 KafkaMessageListenerContainer。

从版本 2.2 开始,添加了一个名为 missingTopicsFatal 的新容器属性(默认值:true)。这可以防止容器在代理上不存在任何已配置主题时启动。如果容器配置为侦听主题模式 (regex),则它不适用。以前,容器线程在 consumer.poll() 方法中循环,等待主题出现,同时记录许多消息。除了日志之外,没有迹象表明存在问题。若要还原以前的行为,可以将该属性设置为 false。

将该属性设置为 false 将禁用检查。

使用弹簧引导(例如 2.2.2.RELEASE),您可以使用:

spring.kafka.listener.missing-topics-fatal=false

这将允许应用程序继续,但不接收有关缺少主题的消息。如果添加主题的时间晚于应用程序,则必须重新启动应用程序,以便使用者接收消息。

见8。集成属性

相关内容

最新更新