我使用KaafkaJS连接和消费kafka消息。我使用下面的代码来使用消息。但是它获取了所有的消息,但是我想把最后一条消息存储在变量messageValue
中。有人能帮我一下吗?
await consumer.run({
eachMessage: async (data) => {
messageValue = data.message.value.toString('utf8').trim()
}
})
console.log(messageValue)
你只需要添加frombegining: false
https://kafka.js.org/docs/consuming a-name-from-beginning-a-frombeginning
下面的代码为我工作。参考
const consume = async () => {
await consumer.connect()
await consumer.subscribe({ topic, fromBeginning: true })
await consumer.run({
eachMessage: ({ message }) => {
console.log(`received message: ${message.value}`)
},
})
}