我们正在使用NestJS作为基于微服务架构的Typescript框架。我们的一些部署是我们所谓的"Kafka worker",它运行的代码实际上不暴露任何REST端点,而只是监听Kafka主题并处理传入事件。
问题是,全局异常过滤器配置为希望捕获任何抛出异常,但没有捕获任何东西(我们最终以节点UnhandledPromiseRejection
结束)
异常过滤器基本是这样配置的(遵循NestJS文档指南):
@Catch()
export class KafkaWorkerExceptionFilter implements ExceptionFilter {
private logger: AppLogger = new AppLogger(KafkaWorkerExceptionFilter.name);
catch(error: Error, host: ArgumentsHost): void {
this.logger.error('Uncaught exception', error);
}
}
我们的控制器是这样配置的:
@Controller()
export class KafkaWorkerController {
private readonly logger = new AppLogger(KafkaWorkerController.name);
constructor(
) {
this.logger.log('Init');
}
@EventPattern(KafkaTopic.PiiRemoval)
async removePiiForTalent(data: IncomingKafkaMessage): Promise<void> {
await asyncDoSomething();
throw new Error('Business logic failed');
}
}
现在,我们期望全局异常过滤器捕获从控制器处理函数内部抛出的错误(以及从嵌套在其中的同步/异步操作的实际函数抛出的实际错误)。这是不可能的。
再一次,跟随NestJS文档实现这样一个过滤器,我尝试了许多方法,以及各种方法的组合来"注册"过滤器,没有成功:
- 列为顶级模块定义中的提供者
{ provide: APP_FILTER, useClass: KafkaWorkerExceptionFilter }
- 在控制器类 上面使用
- 在使用
app.connectMicroservice(...)
和kafka配置之前/之后在main.ts
文件上使用nest的app.useGlobalFilters(new KafkaWorkerExceptionFilter());
@UseFilters(KafkaWorkerExceptionFilter)
装饰器只是作为我们如何在"kafka-worker"中初始化应用的参考下面是main.ts
文件
async function bootstrap() {
const app = await NestFactory.create(KafkaWorkerAppModule, {
logger: ['error', 'warn', 'debug', 'log', 'verbose'],
});
app.use(Helmet());
app.useGlobalPipes(
new ValidationPipe({
disableErrorMessages: false,
whitelist: true,
transform: true,
}),
);
const logger: AppLogger = new AppLogger('Bootstrap');
const config: ConfigService = app.get(ConfigService);
app.connectMicroservice({
transport: Transport.KAFKA,
options: {
client: {
clientId: SECRET_VALUE,
brokers: [SECRET_HOST_ADDRESS],
ssl: true,
sasl: SOME_BOOLEAN_VALUE
? {
mechanism: 'plain',
username: SECRET_VALUE,
password: SECRET_VALUE,
}
: undefined,
},
consumer: {
allowAutoTopicCreation: false,
groupId: SECRET_VALUE,
},
},
});
await app.startAllMicroservices();
const port = config.servicePort || 3000;
await app.listen(port, () => {
logger.log(`Kafka Worker listening on port: ${port}`);
logger.log(`Environment: ${config.nodeEnv}`);
});
}
bootstrap();
当使用connectMicroservice()
方法时,您正在创建一个混合应用程序。
默认情况下,混合型应用不会继承为主应用(基于http)配置的全局管道、拦截器、守卫和过滤器。要从主应用程序继承这些配置属性,请在connectMicroservice()调用的第二个参数(一个可选选项对象)中设置inheritAppConfig属性,如下所示:
const microservice = app.connectMicroservice({ transport: Transport.TCP }, { inheritAppConfig: true });
所以在这种情况下你需要做的就是:
- 在这些方法之一中添加过滤器:
- 作为
APP_FILTER
到KafkaWorkerAppModule
- 在
main.ts
中使用 - 在每个相关提供者上使用
@UseFilters(KafkaWorkerExceptionFilter)
-我将避免使用全局过滤器
app.useGlobalFilters(new KafkaWorkerExceptionFilter())
作为全局滤波器 - 作为
- 将
inheritAppConfig
选项添加到app.connectMicroservice()
中的main.ts
:
app.connectMicroservice({
transport: Transport.KAFKA,
options: {
client: {
clientId: SECRET_VALUE,
brokers: [SECRET_HOST_ADDRESS],
ssl: true,
sasl: SOME_BOOLEAN_VALUE
? {
mechanism: 'plain',
username: SECRET_VALUE,
password: SECRET_VALUE,
}
: undefined,
},
consumer: {
allowAutoTopicCreation: false,
groupId: SECRET_VALUE,
},
},
},
{ inheritAppConfig: true }
);