我们正在尝试构建我们自己的Kafka集成。我们使用装饰器捕获对函数的引用,然后当从Kafka接收到新消息时,尝试在其他地方触发它
修饰符:
export const SUBSCRIBER_FN_REF_MAP = new Map();
export const SUBSCRIBER_OBJ_REF_MAP = new Map();
export function SubscribeTo(topic) {
return (target, propertyKey, descriptor) => {
const originalMethod = target[propertyKey];
SUBSCRIBER_FN_REF_MAP.set(topic, originalMethod);
SUBSCRIBER_OBJ_REF_MAP.set(topic, target);
return descriptor;
};
}
当我们在Service中注入一些存储库时,上下文总是未定义的:
@Injectable()
export class KafkaEventsListenerService {
constructor(
private readonly messageUserRepository: MessageUserRepository
) {
}
@SubscribeTo(DANCEFLAVORSAPI_USER_CREATED_TOPIC)
async onUserCreated(payload: string): Promise<void> {
console.log('[KafkaConsumer receiving ' + DANCEFLAVORSAPI_USER_CREATED_TOPIC + ']: ', payload);
console.log(await this.messageUserRepository.findAll());
}
}
@Injectable()
export class MessageUserRepository {
constructor(
@InjectRepository(MessageUserEntity)
private readonly repository: Repository<MessageUserEntity>
) {}
async findAll(): Promise<MessageUserEntity[]>{
return await this.repository.find();
}
}
存储库函数调用导致错误:
无法读取未定义属性'findAll'
当我删除装饰器时,它工作了。
有人能帮我一下吗?根据您的装饰器的代码示例,看起来您正在尝试捕获对类方法的引用,然后在其他地方(在NestJS生命周期之外)调用它。
decorator会立即运行,在NestJS依赖注入容器中所有东西都被启动之前,所以类的构造函数参数不会被解析或可用。
我建议你重新考虑你的装饰器的架构,把东西移到你的应用程序的onModuleInit
中,以正确地发现和绑定你的Kafka处理程序。
我构建了@golevelup/nestjs-rabbitmq包,它做的事情和你在这里想要完成的一样,但是使用rabbitmq而不是Kafka。
你应该采取的基本步骤是:
- 不捕获对原始函数的引用,而是使用修饰器将元数据附加到处理程序
- 在onModuleInit期间,你可以找到/发现所有用
SubscribeTo
装饰的方法 使用发现的方法,你可以建立一个外部上下文和路由消息
@golevelup/nestjs-discovery包可以很容易地扫描所有提供程序和它们的方法来查找装饰器元数据(免责声明,我是作者)