NestJS:全局异常过滤器不捕捉任何从基于kafka的微服务应用抛出的东西



我们正在使用NestJS作为基于微服务架构的Typescript框架。我们的一些部署是我们所谓的"Kafka worker",它运行的代码实际上不暴露任何REST端点,而只是监听Kafka主题并处理传入事件。

问题是,全局异常过滤器配置为希望捕获任何抛出异常,但没有捕获任何东西(我们最终以节点UnhandledPromiseRejection结束)

异常过滤器基本是这样配置的(遵循NestJS文档指南):

@Catch()
export class KafkaWorkerExceptionFilter implements ExceptionFilter {
private logger: AppLogger = new AppLogger(KafkaWorkerExceptionFilter.name);
catch(error: Error, host: ArgumentsHost): void {
this.logger.error('Uncaught exception', error);
}
}

我们的控制器是这样配置的:

@Controller()
export class KafkaWorkerController {
private readonly logger = new AppLogger(KafkaWorkerController.name);
constructor(
) {
this.logger.log('Init');
}
@EventPattern(KafkaTopic.PiiRemoval)
async removePiiForTalent(data: IncomingKafkaMessage): Promise<void> {
await asyncDoSomething();
throw new Error('Business logic failed');
}
}

现在,我们期望全局异常过滤器捕获从控制器处理函数内部抛出的错误(以及从嵌套在其中的同步/异步操作的实际函数抛出的实际错误)。这是不可能的。


再一次,跟随NestJS文档实现这样一个过滤器,我尝试了许多方法,以及各种方法的组合来"注册"过滤器,没有成功:

  • 列为顶级模块定义中的提供者{ provide: APP_FILTER, useClass: KafkaWorkerExceptionFilter }
  • 在控制器类
  • 上面使用@UseFilters(KafkaWorkerExceptionFilter)装饰器
  • 在使用app.connectMicroservice(...)和kafka配置之前/之后在main.ts文件上使用nest的app.useGlobalFilters(new KafkaWorkerExceptionFilter());

只是作为我们如何在"kafka-worker"中初始化应用的参考下面是main.ts文件

async function bootstrap() {
const app = await NestFactory.create(KafkaWorkerAppModule, {
logger: ['error', 'warn', 'debug', 'log', 'verbose'],
});
app.use(Helmet());
app.useGlobalPipes(
new ValidationPipe({
disableErrorMessages: false,
whitelist: true,
transform: true,
}),
);
const logger: AppLogger = new AppLogger('Bootstrap');
const config: ConfigService = app.get(ConfigService);
app.connectMicroservice({
transport: Transport.KAFKA,
options: {
client: {
clientId: SECRET_VALUE,
brokers: [SECRET_HOST_ADDRESS],
ssl: true,
sasl: SOME_BOOLEAN_VALUE
? {
mechanism: 'plain',
username: SECRET_VALUE,
password: SECRET_VALUE,
}
: undefined,
},
consumer: {
allowAutoTopicCreation: false,
groupId: SECRET_VALUE,
},
},
});

await app.startAllMicroservices();
const port = config.servicePort || 3000;
await app.listen(port, () => {
logger.log(`Kafka Worker listening on port: ${port}`);
logger.log(`Environment: ${config.nodeEnv}`);
});
}
bootstrap();

当使用connectMicroservice()方法时,您正在创建一个混合应用程序。

默认情况下,混合型应用不会继承为主应用(基于http)配置的全局管道、拦截器、守卫和过滤器。要从主应用程序继承这些配置属性,请在connectMicroservice()调用的第二个参数(一个可选选项对象)中设置inheritAppConfig属性,如下所示:

const microservice = app.connectMicroservice({
transport: Transport.TCP
}, { inheritAppConfig: true });

所以在这种情况下你需要做的就是:

  1. 在这些方法之一中添加过滤器:
    1. 作为APP_FILTERKafkaWorkerAppModule
    2. main.ts
    3. 中使用app.useGlobalFilters(new KafkaWorkerExceptionFilter())作为全局滤波器
    4. 在每个相关提供者上使用@UseFilters(KafkaWorkerExceptionFilter)-我将避免使用全局过滤器
  2. inheritAppConfig选项添加到app.connectMicroservice()中的main.ts:
app.connectMicroservice({
transport: Transport.KAFKA,
options: {
client: {
clientId: SECRET_VALUE,
brokers: [SECRET_HOST_ADDRESS],
ssl: true,
sasl: SOME_BOOLEAN_VALUE
? {
mechanism: 'plain',
username: SECRET_VALUE,
password: SECRET_VALUE,
}
: undefined,
},
consumer: {
allowAutoTopicCreation: false,
groupId: SECRET_VALUE,
},
},
},
{ inheritAppConfig: true }
);

相关内容

  • 没有找到相关文章

最新更新