我正在使用kafka-node
ConsumerGroup来读取来自Kafka的消息。我想阅读该主题中的新消息,即不是所有以前未使用的消息,而是仅在创建消费者组后到达的消息。
仅当给定 groupId 没有存储偏移量时,选项fromOffset: 'latest'
才会生效。我可以通过在每次创建 ConsumerGroup 时创建一个新的 groupId 来使用它,但是我想避免这种方法,因为这会在 kafka 服务器上创建大量 groupId 和存储的偏移量。
我还尝试手动将偏移量设置为-1
如下所示:
offset = new kafka.Offset(consumer.client);
offset.commit(groupId, [{ topic: topic, partition: 0, offset: -1 }],
function (err, data) {
logger.debug("tried to set offset: ", data);
if (err) logger.error("error", err);
})
);
这将返回一个错误代码 0,这应该是成功的,但消费者组随后会收到旧消息。
卡夫卡版本:0.10.0 卡夫卡节点:1.6.2
使用选项启动 kafka 消费者组:
autoCommit: false,
fromOffset: 'latest'
创建一个永远不会向 Kafka 提交偏移量的消费者组(除非您手动执行此操作,这是不需要的(。因此,它在启动时将始终选择"最新"选项。
commit(groupId, payloads, cb)
offset.commit 的第一个参数应该是 groupId,而不是 clientId。
应将使用者的组名传递给它。
对于所有注意到即使将"autoCommit"设置为"false",上述解决方案也不起作用并且仍然为组存储偏移量的人来说,这只是一个小更新: 请务必设置其他标志:commitOffsetsOnFirstJoin: false在选项对象中。如果未设置标志,则偏移量将按组存储,一旦第二次启动,它将获得从此偏移量开始的所有消息
我希望这将有助于节省(很多(时间;)