如何用kafka流手动承诺



有没有一种方法可以用kafka流手动提交?

通常在使用KafkaConsumer时,我会做类似以下操作的事情:

while (true) {
    ConsumerRecords<String, String> records = consumer.poll(100);
    for (ConsumerRecord<String, String> record : records){
       // process records
    }
   consumer.commitAsync();
}

我在哪里手动求职。我看不到KStream的类似API。

提交由内部和全自动的流来处理,因此通常没有理由手动提交。请注意,流式传输的处理方式与消费者自动销售相比 - 实际上,对内部使用的消费者禁用自动信号,并且流媒体管理"手动"提交。原因是,提交只能在处理过程中的某些时刻发生,以确保没有数据丢失(在更新状态和冲洗结果方面存在许多内部依赖性(。

对于更频繁的提交,您可以通过StreamsConfig参数commit.interval.ms

减少提交间隔。

尽管如此,通过低级处理器API,手动提交是间接的。您可以使用通过init()方法提供的context对象来调用context#commit()。请注意,这只是尽快提交提交的"访问请求" - 它不是直接发出提交。

相关内容

  • 没有找到相关文章

最新更新