使用EF Core 2.1和SQL Server更正并发处理



我目前正在使用ASP开发API。NET核心Web API以及实体框架核心2.1和SQL Server数据库。API用于从A和B两个账户转账。考虑到B账户的性质,即接受付款的账户,可能会同时执行大量并发请求。正如你所知,如果管理不善,这可能会导致一些用户无法看到他们的付款到达。

花了好几天的时间试图实现并发,我不知道什么是最好的方法。为了简单起见,我创建了一个测试项目,试图重现这个并发问题。

在测试项目中,我有两个路由:request1和request2,每个路由都向同一个用户执行传输——第一个路由的数量为10,第二个路由为20。我在第一个上放了一个Thread.sleep(10000),如下所示:

[HttpGet]
[Route("request1")]
public async Task<string> request1()
{
using (var transaction = _context.Database.BeginTransaction(System.Data.IsolationLevel.Serializable))
{
try
{
Wallet w = _context.Wallets.Where(ww => ww.UserId == 1).FirstOrDefault();
Thread.Sleep(10000);
w.Amount = w.Amount + 10;
w.Inserts++;
_context.Wallets.Update(w);
_context.SaveChanges();
transaction.Commit();
}
catch (Exception ex)
{
transaction.Rollback();
}
}
return "request 1 executed";
}
[HttpGet]
[Route("request2")]
public async Task<string> request2()
{
using (var transaction = _context.Database.BeginTransaction(System.Data.IsolationLevel.Serializable))
{
try
{
Wallet w = _context.Wallets.Where(ww => ww.UserId == 1).FirstOrDefault();
w.Amount = w.Amount + 20;
w.Inserts++;
_context.Wallets.Update(w);
_context.SaveChanges();
transaction.Commit();
}
catch (Exception ex)
{
transaction.Rollback();
}
}
return "request 2 executed";
}

在浏览器中执行request1和request2之后,第一个事务被回滚,原因是:

InvalidOperationException: An exception has been raised that is likely due to a transient failure. Consider enabling transient error resiliency by adding 'EnableRetryOnFailure()' to the 'UseSqlServer' call.

我也可以重试事务,但没有更好的方法吗?使用锁?

可序列化,是最孤立的级别,也是最昂贵的级别,正如文档中所说:

在当前事务完成之前,任何其他事务都不能修改当前事务读取的数据。

这意味着没有其他事务可以更新另一个事务读取的数据,因为request2路由中的更新等待第一个事务(request1(提交,所以该事务在这里按预期工作。

这里的问题是,一旦当前交易读取了钱包行,我们就需要阻止其他交易读取,以解决我需要使用锁定的问题,这样当request1中的第一个select语句执行时,之后的所有交易都需要等待第一个交易完成,以便选择正确的值。由于EF Core不支持锁定,我需要直接执行SQL查询,所以在选择钱包时,我会在当前选择的行中添加一个行锁定

//this locks the wallet row with id 1
//and also the default transaction isolation level is enough
Wallet w = _context.Wallets.FromSql("select * from wallets with (XLOCK, ROWLOCK) where id = 1").FirstOrDefault();
Thread.Sleep(10000);
w.Amount = w.Amount + 10;
w.Inserts++;
_context.Wallets.Update(w);
_context.SaveChanges();
transaction.Commit();

现在,即使在执行了多个请求之后,这也能完美地工作——所有传输的结果都是正确的。除此之外,我使用了一个交易表,该表保存了每次转账的状态,以记录每次交易,以防出现问题。我可以使用该表计算所有钱包的金额。

现在有其他方法可以做到这一点,比如:

  • 存储过程:但我希望我的逻辑在应用程序级别
  • 制作一个同步的方法来处理数据库逻辑:通过这种方式,所有的数据库请求都在一个线程中执行,我读到一篇博客文章,建议使用这种方法,但也许我们会使用多个服务器来实现可伸缩性

我不知道我是否搜索得不好,但我找不到用实体框架核心处理悲观并发的好材料,即使在浏览Github时,我看到的大多数代码都不使用锁定。

这就引出了我的问题:这是正确的做法吗?

提前干杯并表示感谢。

我的建议是抓住DbUpdateConcurrencyException,并在重试逻辑中使用entry.GetDatabaseValues();entry.OriginalValues.SetValues(databaseValues);。无需锁定数据库。

以下是EF Core文档页面上的示例:

using (var context = new PersonContext())
{
// Fetch a person from database and change phone number
var person = context.People.Single(p => p.PersonId == 1);
person.PhoneNumber = "555-555-5555";
// Change the person's name in the database to simulate a concurrency conflict
context.Database.ExecuteSqlCommand(
"UPDATE dbo.People SET FirstName = 'Jane' WHERE PersonId = 1");
var saved = false;
while (!saved)
{
try
{
// Attempt to save changes to the database
context.SaveChanges();
saved = true;
}
catch (DbUpdateConcurrencyException ex)
{
foreach (var entry in ex.Entries)
{
if (entry.Entity is Person)
{
var proposedValues = entry.CurrentValues;
var databaseValues = entry.GetDatabaseValues();
foreach (var property in proposedValues.Properties)
{
var proposedValue = proposedValues[property];
var databaseValue = databaseValues[property];
// TODO: decide which value should be written to database
// proposedValues[property] = <value to be saved>;
}
// Refresh original values to bypass next concurrency check
entry.OriginalValues.SetValues(databaseValues);
}
else
{
throw new NotSupportedException(
"Don't know how to handle concurrency conflicts for "
+ entry.Metadata.Name);
}
}
}
}
}

例如,您可以使用redis的分布式锁定机制。此外,您可以通过userId进行锁定,它不会为其他人锁定方法。

为什么不处理代码中的并发问题,为什么它需要在DB层中?

你可以有一个方法,用给定的值更新给定钱包的值,你可以在那里使用简单的锁。像这样:

private readonly object walletLock = new object();
public void UpdateWalletAmount(int userId, int amount)
{
lock (balanceLock)
{
Wallet w = _context.Wallets.Where(ww => ww.UserId == userId).FirstOrDefault();
w.Amount = w.Amount + amount;
w.Inserts++;
_context.Wallets.Update(w);
_context.SaveChanges();
}
}

所以你的方法看起来是这样的:

[HttpGet]
[Route("request1")]
public async Task<string> request1()
{
try
{
UpdateWalletAmount(1, 10);
}
catch (Exception ex)
{
// log error
}
return "request 1 executed";
}
[HttpGet]
[Route("request2")]
public async Task<string> request2()
{
try
{
UpdateWalletAmount(1, 20);
}
catch (Exception ex)
{
// log error
}
return "request 2 executed";
}

您甚至不需要在这种上下文中使用事务。

相关内容

  • 没有找到相关文章

最新更新