更新同一事务和不同goroutine中的多个表时发生Gorm错误



我有这个代码示例

err = transaction.WithTransaction(context.Background(), func(txCtx context.Context) error {
errorGroup := &errgroup.Group{}
errorGroup.Go(func() error {
return s.addTotable1(txCtx, *model)
})
errorGroup.Go(func() error {
return s.updateTable1(txCtx, *model)
})
errorGroup.Go(func() error {
return s.updateTable2(txCtx, *model)
})
errorGroup.Go(func() error {
return s.updateTable3(txCtx, *model)
})
errorGroup.Go(func() error {
return s.updateTable4(txCtx, *model)
})
errorGroup.Go(func() error {
return s.updateTable5(txCtx, *model)
})
if err := errorGroup.Wait(); err != nil {
transactionError = err
return err
}
}, func(trCtx context.Context) error {
return transactionError
})

此处定义了WithTransaction方法

type txKey struct{}
func injectTx(ctx context.Context, tx *gorm.DB) context.Context {
return context.WithValue(ctx, txKey{}, tx)
}
func ExtractTx(ctx context.Context) *gorm.DB {
if tx, ok := ctx.Value(txKey{}).(*gorm.DB); ok {
return tx
}
return nil
}
func WithTransaction(ctx context.Context, txFunc func(ctx context.Context) error, trFunc func(ctx context.Context) error) error {
gormConnect := postgresGorm.DbConnection
if gormConnect == nil {
return commonerrors.InternalServerError{
ErrorResponse: commonerrors.ErrorResponse{
Message: "error",
},
}
}
tx := gormConnect.Session(&gorm.Session{SkipDefaultTransaction: true}).Begin()
if err := tx.Error; err != nil {
return err
}
err := txFunc(injectTx(ctx, tx))
if err != nil {
tx.Rollback()
tx = postgresGorm.DbConnection.Session(&gorm.Session{SkipDefaultTransaction: true}).Begin()
err = trFunc(injectTx(ctx, tx))
tx.Commit()
return err
}
tx.Commit()
return nil
}

还有我们的PostgresDB 的Gorm配置

var (
DbConnection *gorm.DB = nil
)
const (
connectionFailedMsg = "postgres-gorm connection failed: %s"
)
// OpenConnection open postgres connection
func OpenConnection() {
// PostgreSQL Connection, uncomment to use.
// connection string format: user=USER password=PASSWORD host=/cloudsql/PROJECT_ID:REGION_ID:INSTANCE_ID/[ dbname=DB_NAME]
dbURI := fmt.Sprintf("host=%s%s port=%d user=%s "+"password=%s dbname=%s sslmode=disable",
configs.PostgresqlGormConfigs.CloudSqlPrefix, configs.PostgresqlGormConfigs.Host,
configs.PostgresqlGormConfigs.Port, configs.PostgresqlGormConfigs.User,
configs.PostgresqlGormConfigs.Password, configs.PostgresqlGormConfigs.DbName)
config := &gorm.Config{
NamingStrategy: schema.NamingStrategy{
TablePrefix:   configs.PostgresqlGormConfigs.TableGormPrefix,
SingularTable: true,
}}
var err error
DbConnection, err = gorm.Open(postgres.Open(dbURI), config)
if err != nil {
panic(err)
}
sqlDB, err := DbConnection.DB()
if err != nil {
log.Errorf(connectionFailedMsg, err)
panic(err)
}
if configs.PostgresqlGormConfigs.GormLoggin {
DbConnection.Config.Logger = gormLogger.Default.LogMode(gormLogger.Info)
}
err = sqlDB.Ping()
if err != nil {
log.Errorf(connectionFailedMsg, err)
} else {
log.Info("postgres-gorm connection successfully established")
}
}

这里有一个使用Gorm 更新表格的方法示例

func (o ServiceImpl) UpdateTable1(ctx context.Context, model *model) (*model, error) {
tx := transaction.ExtractTx(ctx)
injectedTransaction := true
if tx == nil {
tx = postgresGorm.DbConnection.Begin()
injectedTransaction = false
}
//Result
queryResult := tx.Save(&model)
// Error
if queryResult.Error != nil {
if !injectedTransaction {
tx.Rollback()
}
errResp := commonerrors.ErrorResponse{
Code:    "500",
Message: "Error",
}
return nil, commonerrors.InternalServerError{ErrorResponse: errResp}
}
if !injectedTransaction {
tx.Commit()
}
return shipDetail, nil
}

我们的问题是,多个goroutine发生了一些问题,我们在使用此服务时随机收到此错误:驱动程序:连接不好但它是完全随机的,第一次尝试总是成功的,之后失败一次,然后再次成功。。。你明白了。

我们已经尝试升级到最新版本的gorm和gorm-postgres驱动程序,但它没有改变任何事情。通过阅读gorm-doc,我们使用的所有方法都应该是线程安全的,所以我现在有点卡住了。如果我找到任何解决方案,我会更新帖子。谢谢

我发布了目前对我们有效的答案,以防将来有人遇到这个问题。主要问题是对所有线程重用相同的gorm.Session,所以我创建了一个通用的解决方法。

首先对WithTransaction函数进行了轻微修改:

func WithTransaction(ctx context.Context, txFunc func(ctx context.Context) error, trFunc func(ctx context.Context) error, nestedTx bool) error {
gormConnect := postgresGorm.DbConnection
if gormConnect == nil {
return commonerrors.InternalServerError{
ErrorResponse: commonerrors.ErrorResponse{
Message: "Error intentando conseguir la conexión con bbdd",
},
}
}
tx := gormConnect.Session(&gorm.Session{SkipDefaultTransaction: true}).Begin()
if err := tx.Error; err != nil {
return err
}
err := txFunc(injectTx(ctx, tx))
if err != nil {
tx.Rollback()
tx = postgresGorm.DbConnection.Session(&gorm.Session{SkipDefaultTransaction: true}).Begin()
err = trFunc(injectTx(ctx, tx))
tx.Commit()
return err
}
if !nestedTx {
tx.Commit()
}
return nil
}

现在,如果我们有嵌套的事务,它将不会提交。

然后我添加了这些函数来处理errorGroup,并为每个Goroutine创建一个新的会话:

func RoutineTransaction(errorGroup *errgroup.Group, transactions chan<- *gorm.DB, subroutineFuncTx func(txCtx context.Context) error) {
errorGroup.Go(func() error {
var routineTxErr error
routineTxErr = WithTransaction(context.Background(), func(txCtx context.Context) error {
transactions <- ExtractTx(txCtx)
routineTxErr = subroutineFuncTx(txCtx)
return routineTxErr
}, func(trCtx context.Context) error {
return routineTxErr
}, true)
return routineTxErr
})

func RoutinesTransactionsCommit(transactions chan *gorm.DB) {
close(transactions)
for tx := range transactions {
tx.Commit()
}
func RoutinesTransactionsRollback(transactions chan *gorm.DB) {
close(transactions)
for tx := range transactions {
tx.Rollback()
}
}

然后你只需像这样在任何需要的地方使用它:

errorGroup := &errgroup.Group{}
transactions := make(chan *gorm.DB, 6)
transaction.RoutineTransaction(errorGroup, transactions, func(txCtx context.Context) error {
return s.addToTable1(txCtx, *model)
})
transaction.RoutineTransaction(errorGroup, transactions, func(txCtx context.Context) error {
return s.updateTable1(txCtx, *model)
})
transaction.RoutineTransaction(errorGroup, transactions, func(txCtx context.Context) error {
return s.updateTable2(txCtx, *model)
})
transaction.RoutineTransaction(errorGroup, transactions, func(txCtx context.Context) error {
return s.updateTable3(txCtx, *model)
})
if err := errorGroup.Wait(); err != nil {
transaction.RoutinesTransactionsRollback(transactions)
transactionError = err
return err
}
transaction.RoutinesTransactionsCommit(transactions)

最新更新