如何使用Kafka主题的最新消息?



我使用KaafkaJS连接和消费kafka消息。我使用下面的代码来使用消息。但是它获取了所有的消息,但是我想把最后一条消息存储在变量messageValue中。有人能帮我一下吗?

await consumer.run({ 
eachMessage: async (data) => {
messageValue = data.message.value.toString('utf8').trim()
}
})
console.log(messageValue)

你只需要添加frombegining: false

https://kafka.js.org/docs/consuming a-name-from-beginning-a-frombeginning

下面的代码为我工作。参考

const consume = async () => {
await consumer.connect()
await consumer.subscribe({ topic, fromBeginning: true })
await consumer.run({
eachMessage: ({ message }) => { 
console.log(`received message: ${message.value}`)
},
})
}

相关内容

  • 没有找到相关文章

最新更新