卡夫卡 - 阅读一包消息



>我有一个批处理作业,它将数据填充到 Kafka 主题。每条消息都有数据和作业标识符。 在消费者方面,我只想阅读属于此工作的消息。在作业完成并使用所有消息后,使用者端必须执行一些后处理。

1( 如果保证在作业期间不会生成其他消息,我如何理解作业已完成并且作业生成的所有消息都被消耗了?(考虑到多个分区和临时性(。

2(如果不能保证在工作期间不会产生其他消息,我相信可以跳过噪音。

谢谢

我假设job_id是恒定的。在这种情况下,您可以在消费者中检查n如果后续轮询从 Kafka 返回空记录,则关闭。n将取决于引入速率和使用者轮询间隔。

我在这里只谈论第一种情况。请注意,这只是一个想法,我自己从未尝试过

您可以使用endOffsets()获取所有分区的最后一个偏移量,然后在每条消息之后遍历所有偏移量,以检查所有当前偏移量是否与结束偏移量匹配。如果一切都是匹配的,你已经到达了终点。

相关内容

最新更新