Nestjs:如何使用mongoose来启动事务会话



使用事务的mongoose文档很简单,但当在nestjs中遵循它时,它会返回一个错误:

Connection 0 was disconnected when calling `startSession`
MongooseError: Connection 0 was disconnected when calling `startSession`
at NativeConnection.startSession

我的代码:

const transactionSession = await mongoose.startSession();
transactionSession.startTransaction();
try
{
const newSignupBody: CreateUserDto = {password: hashedPassword, email, username};

const user: User = await this.userService.create(newSignupBody);
//save the profile.
const profile: Profile = await this.profileService.create(user['Id'], signupDto);
const result:AuthResponseDto = this.getAuthUserResponse(user, profile);
transactionSession.commitTransaction();
return result;
}
catch(err)
{
transactionSession.abortTransaction();
}
finally
{
transactionSession.endSession();
}

我在研究@nestjs/mongoose后找到了解决方案。这里的猫鼬没有连接。这就是返回错误的原因。

解决方案:

import {InjectConnection} from '@nestjs/mongoose';
import * as mongoose from 'mongoose';

在服务类的构造函数中,我们需要添加服务可以使用的连接参数。

export class AuthService {
constructor(
// other dependencies...
@InjectConnection() private readonly connection: mongoose.Connection){}

代替

const transactionSession = await mongoose.startSession();
transactionSession.startTransaction();

我们现在将使用:

const transactionSession = await this.connection.startSession();
transactionSession.startTransaction();

这样就可以解决startSession((之后断开连接的问题。

除了Noobish提供的答案外,我还想演示一个我在项目中使用的可重用函数:

import { ClientSession, Connection } from 'mongoose';
export const transaction = async <T>(connection: Connection, cb: (session: ClientSession) => Promise<T>): Promise<T> => {
const session = await connection.startSession();
try {
session.startTransaction();
const result = await cb(session);
await session.commitTransaction();
return result;
} catch (err) {
await session.abortTransaction();
throw err;
} finally {
await session.endSession();
}
}

然后可以使用,例如:

@Injectable()
export class MyService {
constructor(
@InjectModel(MyModel.name) private myModel: Model<MyModelDocument>,
@InjectConnection() private connection: Connection,
) {}
async find(id: string): Promise<MyModelDocument> {
return transaction(this.connection, async session => {
return this.myModel
.findOne(id)
.session(session);
});
}
async create(myDto: MyDto): Promise<MyModelDocument> {
return transaction(this.connection, async session => {
const newDoc = new this.myModel(myDto);
return newDoc.save({ session });
});
}

显然,上面的例子只是为了演示,其中不需要事务,因为操作已经是原子操作。然而,可以用一个更复杂的例子来扩展内部回调,在这个例子中,操作不是原子的,例如:

  1. 创建一个引用了另一个模式的文档的文档
  2. 查找引用的文档是否存在,如果存在,则将其添加到数组中(一对多关系(
  3. 如果引用的文档不存在,则引发错误。这将中止事务并回滚所有更改,例如删除在步骤1中创建的文档。尽管在所有这些步骤中都需要仔细指定session

以下是我所做的,以便以一种对我来说干净的方式管理nestjs中的猫鼬事务。

首先,创建一个简单的抽象类,以一种通用的方式指定应该可用于管理事务的方法

import { Injectable } from '@nestjs/common';
@Injectable()
export abstract class DbSession<T> {
public abstract start(): Promise<void>;
public abstract commit(): Promise<void>;
public abstract end(): Promise<void>;
public abstract abort(): Promise<void>;
public abstract get(): T | null;
}

然后你可以用mongoose 实现MongoDB的特殊功能

import { InjectConnection } from '@nestjs/mongoose';
import { Injectable } from '@nestjs/common';
import * as mongoose from 'mongoose';
import { RequestScope } from 'nj-request-scope';
import { DbSession } from '../../abstracts';
@Injectable()
@RequestScope()
export class MongoDbSession implements DbSession<mongoose.ClientSession> {
constructor(
@InjectConnection()
private readonly connection: mongoose.Connection
) {}
private session: mongoose.ClientSession | null = null;
public async start() {
if (this.session) {
if (this.session.inTransaction()) {
await this.session.abortTransaction();
await this.session.endSession();
throw new Error('Session already in transaction');
}
await this.session.endSession();
}
this.session = await this.connection.startSession();
this.session.startTransaction({
readConcern: { level: 'majority' },
writeConcern: { w: 'majority' },
readPreference: 'primary',
retryWrites: true,
});
}
public async commit() {
if (!this.session) {
throw new Error('Session not started');
}
await this.session.commitTransaction();
}
public async end() {
if (!this.session) {
throw new Error('Session not started');
}
await this.session.endSession();
}
public async abort() {
if (!this.session) {
throw new Error('Session not started');
}
await this.session.abortTransaction();
}
public get() {
return this.session;
}
}

我使用@RequestScope((是因为我希望我的singleton有一个请求作用域,但我不希望这个作用域扩展到所有提供者。你可以阅读更多关于这个讨论和nj请求范围README

然后您可以在拦截器中使用这个单例。

import {
CallHandler,
ExecutionContext,
Injectable,
Logger,
NestInterceptor,
} from '@nestjs/common';
import { Observable, tap } from 'rxjs';
import { DbSession } from '../abstracts/services';
@Injectable()
export class DbSessionInterceptor implements NestInterceptor {
constructor(private readonly dbSession: DbSession<unknown>) {}
private readonly logger = new Logger(DbSessionInterceptor.name);
async intercept(
_: ExecutionContext,
next: CallHandler
): Promise<Observable<any>> {
await this.dbSession.start();
return next.handle().pipe(
tap({
finalize: async () => {
await this.dbSession.commit();
await this.dbSession.end();
},
error: async (err: Error) => {
await this.dbSession.abort();
await this.dbSession.end();
this.logger.error(err);
throw err;
},
})
);
}
}

这样,如果您决定使用这个拦截器,那么您将在请求过程开始时自动创建一个事务,并在发送响应之前提交。

您可以使用控制器或解析器调用此拦截器。下面是一个装饰器,它自定义默认的GraphQL解析器以暗示数据库事务。

import { UseInterceptors, applyDecorators } from '@nestjs/common';
import { Resolver } from '@nestjs/graphql';
import { DbSessionInterceptor } from '../abstracts';
export function ResolverWithDbSession(resolverParams?: any) {
return applyDecorators(
Resolver(resolverParams),
UseInterceptors(DbSessionInterceptor)
);
}

如果您需要在请求生命周期之外使用事务(在onModuleInit()步骤中的fixture操作期间(,那么您就不能依赖注入器。因此,您可以创建另一个装饰器:

import { Inject, Logger } from '@nestjs/common';
import { DbSession } from '../abstracts';
export function WithSessionDb() {
const dbSessionInjector = Inject(DbSession);
return function decorator(
target: any,
_propertyKey: string,
descriptor: any // PropertyDescriptor
): void {
dbSessionInjector(target, 'dbSession');
const method = descriptor.value;
const logger = new Logger(`${WithSessionDb.name}#${method.name}`);
descriptor.value = async function wrapper(...args: any[]) {
try {
await this.dbSession.start();
const result = await method.apply(this, args);
await this.dbSession.commit();
return result;
} catch (error) {
await this.dbSession.abort();
logger.error(error);
throw error;
} finally {
await this.dbSession.end();
}
};
};
}

我已经将所有这些文件组织在一个名为";数据服务";。

import { Global, Module } from '@nestjs/common';
import { RequestScopeModule } from 'nj-request-scope';
import { MongoDbSession } from './mongo';
import { DbSession } from './abstracts';
@Global()
@Module({
imports: [RequestScopeModule],
providers: [
{
provide: DbSession,
useClass: MongoDbSession,
},
],
exports: [DbSession],
})
// eslint-disable-next-line @typescript-eslint/no-extraneous-class
export class DataServicesModule {}

在这里,在这个名为data-services.module.ts的文件中,我注入了MongoDB会话,而不是DbSession。

这个模块被声明为全局模块,所以我只需要在我的应用程序模块中导入它。

如果您需要在服务中使用会话,你只需要使用它的提供商:

// ...
import { DbSession } from '../../frameworks/data-services';
// ...
@Injectable()
export class SomeRandomService {
constructor(
@InjectModel(SomeRandomModel.name)
private readonly someRandomModel: Model<SomeRandomDocument>,
// ...
private readonly dbSession: DbSession<mongoose.ClientSession>
) {}

async countRandom() {
return this.someRandomModel.countDocuments().session(this.dbSession.get()).exec();
}
// ...
}

我希望这个解决方案会有所帮助。这看起来有些过头了,但这样做,你就不必再担心交易了🚀

最新更新