Kafka 使用者在暂停和恢复后不消耗消息



我正在使用这个node-rdkafka库来实现节点 kafka,并使用消费者暂停和恢复方法来处理背压。我已经创建了一个小演示,我可以在其中pause the consumerresume the consumer但问题是在resume the consumer之后 它停止了消耗消息。

这是我的代码。

const Kafka = require('node-rdkafka');
const topic = 'create_user_channel';
const log_divider = '-----------------------------------';
const consumer = new Kafka.KafkaConsumer({
'group.id':'gsuite_consumer',
'metadata.broker.list': '*******',
'sasl.mechanisms': 'PLAIN',
'sasl.username': '********',
'sasl.password': '********',
'security.protocol': 'SASL_SSL',
'enable.auto.commit':false
}, {});
// Connect the consumer.
consumer.connect({timeout: "1000ms"}, (err) => {
if (err) {
console.log(`Error connecting to Kafka broker: ${err}`);
process.exit(-1);
}
console.log("Connected to Kafka broker");
});
consumer.on('disconnected', (args) => {
console.error(`Consumer got disconnected: ${JSON.stringify(args)}`);
});
let max_queue_size = 3;
let current_queue = [];
let is_pause = false;
// register ready handler.
consumer.on('ready', (arg)=>{
console.log('consumer ready.' + JSON.stringify(arg));
console.log('Consumer is ready');
consumer.subscribe([topic]);
setInterval(function() {
console.log('consumer has consume on :'+timeMs());  
consumer.consume();
}, 1000);
});
consumer.on('data',async (data)=>{
console.log('************consumer is consuming data***********:'+timeMs());
if(!is_pause) {
is_pause = true;
if(data && typeof data !== 'undefined') {
try {
console.log('consumer has received the data:'+timeMs());
consumer.pause([topic]);
console.log('consumer has pause the consuming:'+timeMs());
await processMessage(data);
console.log('consumer is resumed:'+timeMs());
consumer.resume([topic]);
console.log(log_divider);
is_pause = false;
} catch(error) {
console.log('data consuming error');
console.log(error);
}
} else {
is_pause = false;
}
}
});
async function processMessage(data) {
// await print_bulk(data);
await processData(0,data);
}
async function print_bulk(data) {
for(var i=0;i<data.length;i++) {
await processData(i,data[i]);
}
}

/**
* Wait specified number of milliseconds.
* @param ms
*/
async function wait(ms) {
console.log('wait for the 3 sec');
return new Promise((resolve) => setTimeout(resolve, ms));
}
var timeMs = ()=> {
var d = new Date();
var h = addZero(d.getHours(), 2);
var m = addZero(d.getMinutes(), 2);
var s = addZero(d.getSeconds(), 2);
var ms = addZero(d.getMilliseconds(), 3);
return h + ":" + m + ":" + s + ":" + ms;
}
var addZero = (x, n)=> {
while (x.toString().length < n) {
x = "0" + x;
}
return x;
}
async function processData(i,m) {
if (m) {
console.log('processing a data start:'+timeMs());
console.log('Received a message:');
console.log('  message: ' + m.value.toString());
console.log('  key: ' + m.key);
console.log('  size: ' + m.size);
console.log('  topic: ' + m.topic);
console.log('  offset: ' + m.offset);
console.log('  partition: ' + m.partition);
consumer.commitMessage(m);
}
await wait(3000);
console.log('process a data completed:'+timeMs());
// delete current_queue[i];
// console.log('after delting lenght of current queue:'+current_queue.length);
// console.log(log_divider);
return true;
}

谁能帮助我,我在恢复消费者时做错了什么?当我启动消费者时,它只收到一条消息,恢复后它仍然没有消耗任何进一步的消息。

我已经解决了这个问题。除了consumer.pause()consumer.resume()方法,我还需要使用consumer.assignments()方法。

所以会是这样的

consumer.pause(consumer.assignments());
consumer.resume(consumer.assignments());

最新更新