我想写一个函数,一旦主题中的最后一条消息被读取,就会调用回调。
function getCurrentMessages(kafka, topic, cb_done){
// Start consuming from the beginning
var consumer = new kafka.Consumer(new kafka.Client(), [{topic: topic, offset: 0}], {fromOffset: true});
consumer.on('message', function(msg){
// Do something with msg
});
consumer.on('final-message-received', function(){
consumer.close(function(){
cb_done();
});
});
}
这在当前库中可行吗?我不想让消费者打开接收新消息
我认为您可以使用帮助offset来完成此操作。您可以很容易地在消费者端获得事件偏移的信息。
KAFKA提供了以下提到的偏移量:
1。早期偏移
2. 当前偏移量和
3.最新抵消
你只需要获得Kafka中最后一条消息的最新或偏移量,并在消费事件时将其与当前偏移量进行比较,在这种情况下,当你从Kafka中消费最后一条消息时,你将获得最新的偏移量等于当前偏移量。
你可以写一个IF条件来检查
IF(current_offset == latest_offset)
{
打断你想要的流程或动作;
}
如果不适合您,或者需要任何进一步的信息,请告诉我。
您可以尝试这样做:
consumer.on('message', function (message) {
console.log("received message", message);
if(message.offset == (message.highWaterOffset -1)){
consumer.close(true, function(err, message) {
console.log("consumer has been closed..");
});
}
});