有没有一种方法可以用kafka流手动提交?
通常在使用KafkaConsumer
时,我会做类似以下操作的事情:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records){
// process records
}
consumer.commitAsync();
}
我在哪里手动求职。我看不到KStream
的类似API。
提交由内部和全自动的流来处理,因此通常没有理由手动提交。请注意,流式传输的处理方式与消费者自动销售相比 - 实际上,对内部使用的消费者禁用自动信号,并且流媒体管理"手动"提交。原因是,提交只能在处理过程中的某些时刻发生,以确保没有数据丢失(在更新状态和冲洗结果方面存在许多内部依赖性(。
对于更频繁的提交,您可以通过StreamsConfig
参数commit.interval.ms
。
尽管如此,通过低级处理器API,手动提交是间接的。您可以使用通过init()
方法提供的context
对象来调用context#commit()
。请注意,这只是尽快提交提交的"访问请求" - 它不是直接发出提交。