Ready事件上的Kafka节点没有触发



我已经为Kafka客户端创建了Singleton类,并且仅创建一个对象。我需要多次发布相同的主题,而无需创建新的客户端和生产者实例。我找到了producer.on('ready',fn(({}(没有使用同一客户端和生产者实例触发,只有在我有了新客户端和生产者对象时,它首次触发。

在这里示例代码:

singleton类:

const kafka = require('kafka-node');
const logger = require('./../../../../applogger');
const kafkaConfig = require('./../../../../config/config');
function ConnectionProvider() {
    let kafkaConnection = undefined;
    let client = undefined;
    this.getConnection = () => {
        if (!this.kafkaConnection) {
            logger.info("Creating new kafka connection ------------------------------------- ");
            this.client = new kafka.Client(kafkaConfig.ZOOKPER_HOST);
            this.kafkaConnection = new kafka.Producer(this.client);
        }
        return this.kafkaConnection;
    };
    this.getClient = () => {
        if (!this.client) {
            logger.info("Creating new kafka Client ------------------------------------- ");
            this.client = new kafka.Client(kafkaConfig.ZOOKPER_HOST);
        }
        return this.client;
    }
    process.on('SIGINT', function() {
        logger.info("Going to terminate kafka connection...!");
        process.exit(0);
    });
}
module.exports = exports = new ConnectionProvider;

主题发布方法:

const kafkaClient = require('./../core/kafkaConnection');
    const publishToKafka = function(dataPayload, callback) {
        logger.debug('Publishing to topic ', topicName, ' with data: ', dataPayload);
        let producer = kafkaClient.getConnection();
        producer.on('ready', function() {
            let payloads = dataPayload;
            producer.send(payloads, function(err, data) {
                if (err) {
                    logger.error(
                        'Error in publishing message to messaging pipeline ', err
                    );
                    callback(err, null);
                    return;
                }
                logger.debug('Published message to messaging pipeline topic ', topicName, ' with result: ', data);
                callback(null, data);
                return;
            });
        });
        producer.on('error', function(err) {
            logger.error(
                'Error in publishing message to messaging pipeline ', err
            );
            producer.close();
        });
    };

datapayload是:令datapayload = [{主题:weedopic,消息:somemessage}]

我需要多次调用PublishTokafka方法,但只想创建一个Kafka客户端和生产者实例。但是生产者没有发布主题,因为producer.on('ready',function(({}(在使用客户端和生产者的相同对象时不会触发。

预先感谢。

我通过在每次通话后关闭Kafka生产者和客户端实例来解决此问题,因为我需要多次发布到Kafka生产者,但默认情况下,Kafka Zookeeper只允许60 max连接(我们可以增加(我们可以增加(如果需要的话,连接的价值(。这就是为什么为单个Kafka实例创建Singleton类的原因。

但是,在创建了Kafka的单个实例之后,它的producer.on.on('Ready'(事件不会触发,因为第二次我们使用已经处于准备状态的Kafka生产者的相同对象。因此,我们每次都需要新的生产者实例。

const publishToKafka = function(topicName, dataPayload, callback) {
    logger.debug('Publishing to topic ', topicName, ' with data: ', dataPayload);
    let client = new kafka.Client(kakfaConfig.ZOOKPER_HOST);
    let producer = new kafka.Producer(client);

    producer.on('ready', function() {
        let payloads = dataPayload;
        producer.send(payloads, function(err, data) {
            if (err) {
                logger.error(
                    'Error in publishing message to messaging pipeline ', err
                );
                callback(err, null);
                return;
            }
            logger.debug('Published message to messaging pipeline topic ', topicName, ' with result: ', data);
            producer.close();
            client.close();
            callback(null, data);
            return;
        });
    });
    producer.on('error', function(err) {
        logger.error(
            'Error in publishing message to messaging pipeline ', err
        );
        producer.close();
    });
};

无需为单个对象创建单例类。

最新更新