如何使用kafkajs npm从kafka主题计数或批处理模型中消费消息



我需要从Kafka topic中消费消息。

例如,我的主题有一千条记录。我需要使用消息计数基础。作为参数计数。例如,如果我给count 100,那么只消耗100条消息,next循环消耗下100条需要消耗的记录。

如何使用kafkajs.

我回答我自己的问题,这有助于我做批处理。

下面的代码示例消耗数据5秒,将消耗的数据推入一个数组,然后做处理,处理完成后恢复consumer

var { Kafka } = require('kafkajs')
const kafka = new Kafka({
clientId: 'consumer',
brokers: ['kafka:0']
})
const consumer = kafka.consumer({ groupId: 'test_topic_groups', allowAutoTopicCreation: true })
const run = async () => {
// Consuming
await consumer.connect()
await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })
var isPaused = false;
var data = []
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
data.push(message)
if (!isPaused) {
isPaused = true;
setTimeout(async () => {
consumer.pause([{ "topic": "test-topic" }])
doyourProcessandResume(data)
console.log('Consumer paused...........')
}, 5000)
}
}
})
function doyourProcessandResume(data) {
// Do the process here , now i put timeout only then resume the consumer
setTimeout(() => {
isPaused = false
consumer.resume([{ "topic": 'test-topic' }]);
console.log('Consumer resumed..............')
}, 2000)
}
}
run().catch(console.error)

最新更新