在我的一个用例中,包括使用数据,执行一些操作并将其生成为新主题。
我正在使用https://www.npmjs.com/package/kafkajs npm库。
我希望在操作成功后手动提交偏移量,以避免任何数据丢失。我使用autoCommit: false
来避免数据在消费后自动提交。
consumer.commitOffsets([
{ topic: 'topic-A', partition: 0, offset: '1' }
])
正如我在某处读到的,如果我们有意地提交每个偏移量(在消费后立即提交偏移量),那么它将在代理上创建负载,这是不好的。
我需要kafka专家建议建议最好的方法对我的上述用例,以避免任何数据丢失?请通知
为了手动处理提交,下面是代码。
await consumer.run({
autoCommit: false,
eachMessage: async ({ topic, partition, message }) => {
...
await consumer.commitOffsets([{ topic, partition, offset: (Number(message.offset) + 1).toString() }]);
},
});