当将 consumption.on('message,callback) 放入另一个回调中时,无法从开头使用消息



我将消息从kafka消费者推送到mongodb。

  • 如果我把MongoClient.connect(url, function(err, client) {})放在consumer.on('message', callback)的回调中,消费者能够从一开始就获得消息。

示例代码:

consumer.on('message', (message) => {
MongoDB.connectDB((err) => {
if (err) throw err
const db = MongoDB.getDB();
const collectionKafka = db.collection('transaction');
try {
insertMessage(message, collectionKafka);  
} catch (e) {
throw e
}
})
});
  • 但是,如果我把consumer.on('message', callback)放在MongoClient.connect(url, function(err, client) {})里面,那么消费者从一开始就不会收到消息。

使用此代码,消费者只能使用最新的消息,它无法从一开始就使用。如何解决此问题

const kafka = require('kafka-node');
const mongo = require('mongodb');
const assert = require('assert');
const { Consumer, Offset, KafkaClient } = kafka;
const { MongoClient } = mongo;
const topic = 'testprocessing';
const url = 'mongodb://localhost:27017';
const dbName = 'test_kafka_processing';
let mongodb;
const client = new KafkaClient({kafkaHost: 'localhost:9092'});
const topics = [{
topic: 'testprocessing',
offset: 0,
partition: 0
}];
const options = {
autoCommit: false,
fetchMaxWaitMs: 1000,
fetchMaxBytes: 1024 * 1024,
fromOffset: true
};
MongoClient.connect(url, function(err, client) {
assert.equal(null, err);
mongodb = client.db(dbName);
consumer.on('message', (message) => {
const collection = mongodb.collection('transaction');
// Insert some documents
let msg = JSON.parse(message.value);
collection.insertOne(msg,
function(err, result) {
assert.equal(err, null);
console.log("Inserted message into the collection");
//callback(result);
});
});
consumer.on('error', (err) => {
console.log('error', err);
});
});
const consumer = new Consumer(client, topics, options);

即使我将初始consumer放在MongoClient.connect的回调中.使用者仍不会获取消息

const kafka = require('kafka-node');
const mongo = require('mongodb');
const assert = require('assert');
const { Consumer, Offset, KafkaClient } = kafka;
const { MongoClient } = mongo;
const topic = 'testprocessing';
const url = 'mongodb://localhost:27017';
const dbName = 'test_kafka_processing';
let mongodb;
const client = new KafkaClient({kafkaHost: 'localhost:9092'});
const topics = [{
topic: 'testprocessing',
offset: 0,
partition: 0
}];
const options = {
autoCommit: false,
fetchMaxWaitMs: 1000,
fetchMaxBytes: 1024 * 1024,
fromOffset: true
};
MongoClient.connect(url, function(err, client) {
assert.equal(null, err);
mongodb = client.db(dbName);
const consumer = new Consumer(client, topics, options); // <== put the initial of consumer at this time
consumer.on('message', (message) => {
const collection = mongodb.collection('transaction');
// Insert some documents
let msg = JSON.parse(message.value);
collection.insertOne(msg,
function(err, result) {
assert.equal(err, null);
console.log("Inserted message into the collection");
//callback(result);
});
});
consumer.on('error', (err) => {
console.log('error', err);
});
});

使用者未在"暂停"模式下初始化。因此,消费者将在建立MongoDB连接之前开始获取Kafka消息,甚至在设置.on('message')处理程序之前。

解决方法是在使用者的选项对象中设置paused选项标志。

链接到库代码行

const options = {
autoCommit: false,
fetchMaxWaitMs: 1000,
fetchMaxBytes: 1024 * 1024,
fromOffset: true,
paused: true // <-- "start the consumer paused"
};
const consumer = new Consumer(client, topics, options);
MongoClient.connect(url, function(err, client) {
assert.equal(null, err);
mongodb = client.db(dbName);
consumer.on('message', (message) => {
const collection = mongodb.collection('transaction');
// Insert some documents
let msg = JSON.parse(message.value);
collection.insertOne(msg,
function(err, result) {
assert.equal(err, null);
console.log("Inserted message into the collection");
//callback(result);
});
});
consumer.on('error', (err) => {
console.log('error', err);
});
consumer.resume() // <-- the consumer starts processing kafka messages after a MongoDB connection has been established
});

最新更新