我们使用弹簧云流kafka粘合剂(带有项目反应器集成,即, Flux
流(和手动偏移提交(即autoCommitOffset = false
(。
我们正在尝试与Spring-Kafka检验的嵌入式Kafka编写集成测试通过手动阅读消费者群体的偏移,使用管理员客户端,这一切都应该主张这一切测试之前和之后向我们的主题发送消息。
测试间歇性失败。使用等待性,我们现在最多等待10秒钟来轮询偏移,而且这似乎解决了我们的大多数问题,因为偏移量在7秒钟后会发生变化 - 但这对测试不令人满意。
有没有办法确保春季云流kafka活页夹一旦我们通过调用Acknowledgement.acknowledge()
来确认消息收据,将立即编写偏移更改?
换句话说:我们如何在不需要等待的情况下验证acknowledge
的调用?
我们使用Kotlin,Mockito和Mockito-Kotlin,因此无法使用PowerMockito。
问题是Consumer
不是线程安全。提交必须在容器线程上完成。如果当您ACK时消费者坐在poll()
中,则必须等待pollTimeout
,然后才能承诺偏移。
默认的pollTimeout
为5秒。
您可以添加ListenerContainerCustomizer
@Bean
来修改ContainerProperties
。