Kafka消费者手动偏移提交



在我的一个用例中,包括使用数据,执行一些操作并将其生成为新主题。

我正在使用https://www.npmjs.com/package/kafkajs npm库。

我希望在操作成功后手动提交偏移量,以避免任何数据丢失。我使用autoCommit: false来避免数据在消费后自动提交。

这是手动提交偏移量的代码
consumer.commitOffsets([
{ topic: 'topic-A', partition: 0, offset: '1' }
])

正如我在某处读到的,如果我们有意地提交每个偏移量(在消费后立即提交偏移量),那么它将在代理上创建负载,这是不好的。

我需要kafka专家建议建议最好的方法对我的上述用例,以避免任何数据丢失?请通知

为了手动处理提交,下面是代码。

await consumer.run({
autoCommit: false,
eachMessage: async ({ topic, partition, message }) => {
...
await consumer.commitOffsets([{ topic, partition, offset: (Number(message.offset) + 1).toString() }]);
},
});

相关内容

  • 没有找到相关文章

最新更新