我们在windows服务的多线程环境中处理单个文件,每10秒就会有一个新线程开始处理未处理的文件。
await Task.Run(()=> ProcessFilesFromDatabase());
有时我们会遇到这样的问题,比如两个线程读取相同的文件,我们会遇到重复的文件被处理。当我们在线检查如何在LINQ to Entities中克服这种情况时,我们建议使用事务作用域并读取已提交的using (var context = new Context())
{
var transActionOption1 = new TransactionOptions();
transActionOption1.IsolationLevel = System.Transactions.IsolationLevel.ReadCommitted;
using (TransactionScope tran = new TransactionScope(TransactionScopeOption.Required, transActionOption1))
{
try
{
var fileInfo = context.File.Where(a=>!a.IsRead).ToList();
foreach (FileDetails fileDetail in fileInfo)
{
fileDetail.IsRead = true;
context.Entry(fileDetail).State = EntityState.Modified;
context.savechanges();
}
tran.Complete();
}
//Process each file after updating read
}
}
但是线程1仍然读取了5条记录,并且在更新回数据库之前使用标记IsRead - true,线程2读取了相同的5条记录
我们认为在ReadCommitted范围内,任何未提交的记录都不会被其他线程读取。
谢谢,
热带雨林
这个设计有几个问题:
- EF和任何ORM都不适合这样的查询。对每一行调用
SaveChanges
增加碰撞的机会。 - 在同一个线程上轮询和处理数据几乎可以保证从一个作业执行到下一个作业执行会发生冲突
获取更改
在SQL Server中,可以使用OUTPUT子句返回修改过的数据,例如:
UPDATE Files
Set IsRead=1
OUTPUT inserted.*
where IsRead=0;
在EF Core中,可以将结果映射到具有FromSqlRaw
的实体,例如:
List<File> GetNewFiles()
{
var sql=@"UPDATE Files
Set IsRead=1
OUTPUT inserted.*
where IsRead=0;";
using(var context=new MyContext())
{
var newFiles=context.Files.FromSqlRaw(sql)...ToList();
return newFiles;
}
}
EF仅用于加载此处的数据。使用像Dapper这样的microORM来执行查询并直接映射结果可能更简单
List<File> GetNewFiles(string connectionString)
{
var sql=@"UPDATE Files
Set IsRead=1
OUTPUT inserted.*
where IsRead=0;";
using(var connection=new SqlConnection(connectionString))
{
var newFiles=connection.Query<File>(sql).ToList();
return newFiles;
}
}
将轮询与处理分开
为了避免冲突,轮询和处理步骤应该分开。一种方法是将投票结果发布到例如ActionBlock或Channel,并让一个worker处理它们。
假设处理方法为ProcessFiles(List<File>)
。ActionBlock可以用来处理任何通过该方法提交给它的文件。
var block=new ActionBlock<List<File>>(files=>ProcessFiles(files));
计时器可用于轮询数据库并将任何新文件发布到块
var timer=new Timer(()=>{
var files=GetNewFiles(connectionString);
block.Post(files)
});
timer.Change(0,10000);
这足以开始轮询数据库并在另一个数据库中处理文件。
要停止处理,首先我们停止计时器,然后我们告诉块停止并等待它完成:
timer.Dispose();
block.Complete();
await block.Completion;