发布事件在生产者准备就绪之前被触发



我已经编写了一个节点模块来连接到Kafka。

kafka-connect.js

var kafka = require('kafka-node');
var Producer = kafka.Producer,
client = new kafka.Client(),
producer = new Producer(client);
module.exports = {
producer
};

KafkaService.js

const {producer} = require('./kafka-connect');
producer.on('error', function (err) {
console.log('Producer is in error state');
console.log(err);
});
producer.on('ready', function () {
console.log('Producer is ready');
});
const KafkaService = {
sendRecord: (kafkaTopic, data, callback = (err, data) => console.log(err)) => {
var sendingData = {};
sendingData.event_data = JSON.stringify(data);
sendingData.event_type = 6;
const record = [
{
topic: kafkaTopic,
messages: sendingData,
partition : 0
}
];
producer.send(record, callback);
}
};
module.exports = {
KafkaService
};

现在我用这两个来向卡夫卡发布数据。以下是这样做的代码:

const {KafkaService} = require('../kafka/KafkaService');
const {newOrder} = require('../objs/newOrderEvent');
KafkaService.sendRecord("incentive_order_data", newOrder);

但运行此文件会出现错误:

{ BrokerNotAvailableError: Broker not available
at new BrokerNotAvailableError (/Users/rajat.mishra/self/nodekafka/node_modules/kafka-node/lib/errors/BrokerNotAvailableError.js:11:9)
at Client.loadMetadataForTopics (/Users/rajat.mishra/self/nodekafka/node_modules/kafka-node/lib/client.js:389:15)
at Client.send (/Users/rajat.mishra/self/nodekafka/node_modules/kafka-node/lib/client.js:562:10)
at /Users/rajat.mishra/self/nodekafka/node_modules/kafka-node/lib/client.js:241:10
at /Users/rajat.mishra/self/nodekafka/node_modules/async/dist/async.js:473:16
at iteratorCallback (/Users/rajat.mishra/self/nodekafka/node_modules/async/dist/async.js:1064:13)
at /Users/rajat.mishra/self/nodekafka/node_modules/async/dist/async.js:969:16
at buildRequest (/Users/rajat.mishra/self/nodekafka/node_modules/kafka-node/lib/client.js:257:24)
at /Users/rajat.mishra/self/nodekafka/node_modules/async/dist/async.js:3110:16
at eachOfArrayLike (/Users/rajat.mishra/self/nodekafka/node_modules/async/dist/async.js:1069:9) message: 'Broker not available' }
Producer is ready

显然,发布方法在生产者准备好之前就被调用了。我无法想出解决这个问题的办法。一种方法是把承诺带到画面中,但这只是我的假设,确切的方法可能会有所不同。

您不需要等待生产商做好准备。

你需要做这个

producer.on('ready', function () {
console.log('Producer is ready');
// send data here 
});

相关内容

最新更新