等待Kafka消费者在请求/回复语义中初始化偏移量



我正在使用一个共享回复主题(即组)的ReplyingKafkaTemplate。id为null,组管理已禁用)。初始偏移量设置为'latest'。

我面临着以下问题:

  1. 我手动启动侦听器容器,它会导致一个异步操作,为消费者获取元数据。
  2. 我调用sendAndReceive()方法
  3. 在第一步异步操作返回之前创建回复消息。
  4. 异步操作返回并设置偏移为1(由于'latest'),因此应答消息将被忽略。

为了解决这个问题,我需要断言在调用sendAndReceive()方法之前初始化偏移量。我已经寻找了一个合适的应用程序事件,但它们似乎都不合适。

是否有任何方法可以断言消费者已经完全初始化?

这个问题类似于等待kafka消费者在向kafka spring boot写入消息之前进行轮询,但建议的解决方案也不能解决这个问题(除非我误解了这个建议,它只是降低了风险,但不能消除它)。

目前不可能。

我打开了这个问题https://github.com/spring-projects/spring-kafka/issues/2318

编辑

作为一种解决方法,您可以设置idleEventInterval并等待第一个ListenerContainerIdleEvent

我调查了加里·拉塞尔提出的问题,现在已经解决了。现在可以输入replyingKafkaTemplate.waitforAssignment(Duration.ofSeconds(5))

https://github.com/spring-projects/spring-integration/commit/e19e6d4726a6acd52b81820565a22e8020814bda

最新更新