我有一个节点14服务器,初始化如下:
import express, { Express } from 'express';
import kafkaConsumer from './modules/kafkaConsumer';
async function bootstrap(): Promise<Express> {
kafkaConsumer();
const app = express();
app.get('/health', (_req, res) => {
res.send('ok');
});
return app;
}
export default bootstrap;
kafkaConsumer
代码:
import logger from './logger.utils';
import KafkaConnector from '../connectors/kafkaConnector';
// singleton
const connectorInstance: KafkaConnector = new KafkaConnector('kafka endpoints', 'consumer group name');
// creating consumer and producer outside of main function in order to not initialize a new consumer producer per each new call.
(async () => {
await connectorInstance.createConsumer('consumer group name');
})();
const kafkaConsumer = async (): Promise<void> => {
const kafkaConsumer = connectorInstance.getConsumer();
await kafkaConsumer.connect();
await kafkaConsumer.subscribe({ topic: 'topic1', fromBeginning: true });
await kafkaConsumer.run({
autoCommit: false, // cancel auto commit in order to control committing
eachMessage: async ({ topic, partition, message }) => {
const messageContent = message.value ? message.value.toString() : '';
logger.info('received message', {
partition,
offset: message.offset,
value: messageContent,
topic
});
// commit message once finished all processing
await kafkaConsumer.commitOffsets([ { topic, partition, offset: message.offset } ]);
}
});
};
export default kafkaConsumer;
您可以看到,在kafkaConsumer
模块中有一个异步函数,它在请求时被调用以初始化使用者实例。
如何保证在导入模块时成功通过?
此外,在导入模块时,这是否意味着kafkaConsumer
默认函数会被自动调用?这不会导致服务器在启动时基本上被卡住吗?
非常感谢这里的指导,提前谢谢。
Twicked了Kafka初始化,并使用本地Kafka进行了测试。一切如预期。