npm kafka-node:读取最后消息后关闭consumer



我想写一个函数,一旦主题中的最后一条消息被读取,就会调用回调。

function getCurrentMessages(kafka, topic, cb_done){
  // Start consuming from the beginning
  var consumer = new kafka.Consumer(new kafka.Client(), [{topic: topic, offset: 0}], {fromOffset: true});
  consumer.on('message', function(msg){
    // Do something with msg
  });
  consumer.on('final-message-received', function(){
    consumer.close(function(){
      cb_done();
    });
  });
}

这在当前库中可行吗?我不想让消费者打开接收新消息

我认为您可以使用帮助offset来完成此操作。您可以很容易地在消费者端获得事件偏移的信息。
KAFKA提供了以下提到的偏移量:

1。早期偏移
2. 当前偏移量和
3.最新抵消

你只需要获得Kafka中最后一条消息的最新或偏移量,并在消费事件时将其与当前偏移量进行比较,在这种情况下,当你从Kafka中消费最后一条消息时,你将获得最新的偏移量等于当前偏移量。
你可以写一个IF条件来检查

IF(current_offset == latest_offset)
{
打断你想要的流程或动作;
}

如果不适合您,或者需要任何进一步的信息,请告诉我。

您可以尝试这样做:

consumer.on('message', function (message) {
        console.log("received message", message);
        if(message.offset == (message.highWaterOffset -1)){
          consumer.close(true, function(err, message) {
            console.log("consumer has been closed..");
          });
        }
 });

最新更新