卡夫卡消费者组:仅阅读新消息



我正在使用kafka-nodeConsumerGroup来读取来自Kafka的消息。我想阅读该主题中的新消息,即不是所有以前未使用的消息,而是仅在创建消费者组后到达的消息。

仅当给定 groupId 没有存储偏移量时,选项fromOffset: 'latest'才会生效。我可以通过在每次创建 ConsumerGroup 时创建一个新的 groupId 来使用它,但是我想避免这种方法,因为这会在 kafka 服务器上创建大量 groupId 和存储的偏移量。

我还尝试手动将偏移量设置为-1如下所示:

offset = new kafka.Offset(consumer.client);
offset.commit(groupId, [{ topic: topic, partition: 0, offset: -1 }],
function (err, data) {
logger.debug("tried to set offset: ", data);
if (err) logger.error("error", err);
})
);

这将返回一个错误代码 0,这应该是成功的,但消费者组随后会收到旧消息。

卡夫卡版本:0.10.0 卡夫卡节点:1.6.2

使用选项启动 kafka 消费者组:

autoCommit: false,   
fromOffset: 'latest'

创建一个永远不会向 Kafka 提交偏移量的消费者组(除非您手动执行此操作,这是不需要的(。因此,它在启动时将始终选择"最新"选项。

commit(groupId, payloads, cb)

offset.commit 的第一个参数应该是 groupId,而不是 clientId。

应将使用者的组名传递给它。

对于所有注意到即使将"autoCommit"设置为"false",上述解决方案也不起作用并且仍然为组存储偏移量的人来说,这只是一个小更新: 请务必设置其他标志:commitOffsetsOnFirstJoin: false在选项对象中。如果未设置标志,则偏移量将按组存储,一旦第二次启动,它将获得从此偏移量开始的所有消息

我希望这将有助于节省(很多(时间;)

最新更新