我正在使用一个共享回复主题(即组)的ReplyingKafkaTemplate
。id为null
,组管理已禁用)。初始偏移量设置为'latest'。
我面临着以下问题:
- 我手动启动侦听器容器,它会导致一个异步操作,为消费者获取元数据。
- 我调用
sendAndReceive()
方法 - 在第一步异步操作返回之前创建回复消息。
- 异步操作返回并设置偏移为1(由于'latest'),因此应答消息将被忽略。
为了解决这个问题,我需要断言在调用sendAndReceive()
方法之前初始化偏移量。我已经寻找了一个合适的应用程序事件,但它们似乎都不合适。
是否有任何方法可以断言消费者已经完全初始化?
这个问题类似于等待kafka消费者在向kafka spring boot写入消息之前进行轮询,但建议的解决方案也不能解决这个问题(除非我误解了这个建议,它只是降低了风险,但不能消除它)。
目前不可能。
我打开了这个问题https://github.com/spring-projects/spring-kafka/issues/2318
编辑
作为一种解决方法,您可以设置idleEventInterval
并等待第一个ListenerContainerIdleEvent
。
我调查了加里·拉塞尔提出的问题,现在已经解决了。现在可以输入replyingKafkaTemplate.waitforAssignment(Duration.ofSeconds(5))
https://github.com/spring-projects/spring-integration/commit/e19e6d4726a6acd52b81820565a22e8020814bda