抽象函数不会使其可重用;信息保留在上一次通话中



我必须在整个项目中多次使用以下代码。因此,我决定将其抽象为utils文件。因此,当我需要它时,我所要做的就是打电话给consumer("topic")。由于某种原因,它不允许我重复使用该功能,它只是保留上次调用的信息。

var async = require('async')
var kafka = require('kafka-node'),
Consumer = kafka.Consumer,
client = new kafka.KafkaClient({ kafkaHost: 'kafka:9092' }),
offset = new kafka.Offset(client)
let messages = []
let latestOffset
let consumerInstance
let consumer = async (topic) => {
messages = []
latestOffset = 0
consumerInstance = ""
offset.fetch([{ topic: topic, partition: 0, time: -1 }], (errd, data) => {
if (data) {
latestOffset = data[topic][0][0];
}
});
consumerInstance = new Consumer(client, [{ topic: topic, partition: 0, fromOffset: latestOffset }], { autoCommit: true });
let KafkaConsumer = new Promise(function (resolve, reject) {
consumerInstance.on('message', async (message) => {
consumerInstance.pause()
await q.push(message, function (err) {
if (err) { reject(err); return }
else {
consumerInstance.resume()
}
})
if (message.offset == (message.highWaterOffset - 1)) {
resolve()
}
})
consumerInstance.on('error', async (err) => {
reject(err)
})
})
await KafkaConsumer
await q.drain()
return messages
}
var q = async.queue(async (message, cb) => {
await processTestCase(message, cb)
}, 1)
q.drain(async function () {
// Pause the consumer
consumerInstance.close(true, function (err, message) {
});
});
processTestCase = async (message, cb) => {    
messages.push(JSON.parse(message.value))
cb()
}
module.exports = { consumer }

主要问题是,除非我在consumerInstance前面使用let,否则我似乎无法重用该功能,当我这样做时,我无法关闭q.drain中的消费者 非常欢迎任何线索或指示。啪!

您可能会看到重复的消息,因为您的messages数组与导出位于同一范围内。这意味着如果您在任何时候同时调用consumer('topic'),这些调用将共享对messages的相同引用。这意味着从一个使用者调用设置messages = []也会影响其他使用者调用的其他消息。你不想要那个。

这是我的重构。我把async换成了我创建的 Promise 库。我试着做你想做的一切。我不确定的一件事是message.highWaterOffset.如果在那里遇到内存泄漏,还有更多工作要做。

var { pipe, get } = require('rubico')
var kafka = require('kafka-node'),
Consumer = kafka.Consumer
// safely accesses properties with get
const safeParseTopic = (topic, data) => get([topic, 0, 0])(data)
// changed this to return a Promise
const fetchLatestOffset = client => topic => new Promise((resolve, reject) => {
new kafka.Offset(client).fetch(
[{ topic: topic, partition: 0, time: -1 }],
(err, data) => err ? reject(err) : resolve(safeParseTopic(topic, data)),
)
})
// it's recommend to create new client for different consumers
// https://www.npmjs.com/package/kafka-node#consumer
const makeConsumerInstance = client => ({ topic, offset }) => new Consumer(
client,
[{ topic, offset, partition: 0 }],
{ autoCommit: true },
)
// this is the function version of KafkaConsumer from your example
// consume(consumerInstance) == KafkaConsumer
const consume = consumerInstance => new Promise((resolve, reject) => {
const messages = []
consumerInstance.on('message', message => {
messages.push(message)
// you don't need to pause and resume
// you are gauranteed one message at a time in this block
if (message.offset == (message.highWaterOffset - 1)) {
resolve(messages)
// you have to handle cleanup of consumerInstance after the resolve
// If there's a memory leak, I would look here
}
})
// handles a termination signal from the producer
consumerInstance.on('end', () => resolve(messages))
consumerInstance.on('error', reject)
})
// topic -> messages
// pipe chains async functions together
const consumer = topic => {
const client = new kafka.KafkaClient({ kafkaHost: 'kafka:9092' })
return pipe([
fetchLatestOffset(client), // topic -> latestOffset
latestOffset => ({
topic: topic,
offset: latestOffset,
}), // latestOffset -> ({ topic, offset })
makeConsumerInstance(client), // ({ topic, offset }) -> consumerInstance
consume, // consumerInstance -> messages
])(topic)
}
module.exports = { consumer }

最新更新