在使用嵌入式KAFKA集成测试中,在Spring Cloud Stream中的集成测试时,如何立即验证一条消息



我们使用弹簧云流kafka粘合剂(带有项目反应器集成,即, Flux流(和手动偏移提交(即autoCommitOffset = false(。

我们正在尝试与Spring-Kafka检验的嵌入式Kafka编写集成测试通过手动阅读消费者群体的偏移,使用管理员客户端,这一切都应该主张这一切测试之前和之后向我们的主题发送消息。

测试间歇性失败。使用等待性,我们现在最多等待10秒钟来轮询偏移,而且这似乎解决了我们的大多数问题,因为偏移量在7秒钟后会发生变化 - 但这对测试不令人满意。

有没有办法确保春季云流kafka活页夹一旦我们通过调用Acknowledgement.acknowledge()来确认消息收据,将立即编写偏移更改?

换句话说:我们如何在不需要等待的情况下验证acknowledge的调用?

我们使用Kotlin,Mockito和Mockito-Kotlin,因此无法使用PowerMockito。

问题是Consumer不是线程安全。提交必须在容器线程上完成。如果当您ACK时消费者坐在poll()中,则必须等待pollTimeout,然后才能承诺偏移。

默认的pollTimeout为5秒。

您可以添加ListenerContainerCustomizer @Bean来修改ContainerProperties

最新更新