节点-导入模块中的异步函数



我有一个节点14服务器,初始化如下:

import express, { Express } from 'express';
import kafkaConsumer from './modules/kafkaConsumer';
async function bootstrap(): Promise<Express> {
kafkaConsumer();
const app = express();
app.get('/health', (_req, res) => {
res.send('ok');
});
return app;
}
export default bootstrap;

kafkaConsumer代码:

import logger from './logger.utils';
import KafkaConnector from '../connectors/kafkaConnector';
// singleton
const connectorInstance: KafkaConnector = new KafkaConnector('kafka endpoints', 'consumer group name');
// creating consumer and producer outside of main function in order to not initialize a new consumer producer per each new call.
(async () => {
await connectorInstance.createConsumer('consumer group name');
})();
const kafkaConsumer = async (): Promise<void> => {

const kafkaConsumer = connectorInstance.getConsumer();
await kafkaConsumer.connect();
await kafkaConsumer.subscribe({ topic: 'topic1', fromBeginning: true });
await kafkaConsumer.run({
autoCommit: false, // cancel auto commit in order to control committing
eachMessage: async ({ topic, partition, message }) => {
const messageContent = message.value ? message.value.toString() : '';
logger.info('received message', {
partition,
offset: message.offset,
value: messageContent,
topic
});
// commit message once finished  all processing
await kafkaConsumer.commitOffsets([ { topic, partition, offset: message.offset } ]);
}
});
};
export default kafkaConsumer;

您可以看到,在kafkaConsumer模块中有一个异步函数,它在请求时被调用以初始化使用者实例。

如何保证在导入模块时成功通过?

此外,在导入模块时,这是否意味着kafkaConsumer默认函数会被自动调用?这不会导致服务器在启动时基本上被卡住吗?

非常感谢这里的指导,提前谢谢。

Twicked了Kafka初始化,并使用本地Kafka进行了测试。一切如预期。

最新更新