KAFKA流:如何在流媒体代码内获取投票的批处理大小 - 高级消费者



我需要使轮询批量大小为500,并在处理500条消息后进行批处理提交。因此,一组少于500条消息的最后一组,一旦处理了批处理中的最后一条消息,我就需要提交。如果有的话,我是否可以知道在民意调查中获取了多少消息,如果主题中要处理的消息数量恰好小于民意调查大小。

流并没有真正设置以支持这样的用例,尽管经常在"异步处理"的标题下讨论它。

现在,如果您真的想使用流,最好的选择是将您的数据库持久性逻辑包装在自定义处理器或变压器中,这也会在记录中缓冲并在批处理时发送批处理。

但是,如果您真的只需要从一个主题中"复制"数据到数据库中,则可能会直接使用连接器甚至直接使用Kafka消费者获得更多的里程。

希望这会有所帮助!

最新更新