SQL Threadsafe UPDATE TOP 1 for FIFO Queue



我有一个正在准备的发票表,然后准备打印。

[STATUS]列为草稿、打印、打印、打印

我需要获取要打印的第一条 (FIFO) 记录的 ID,并更改记录状态。该操作必须是线程安全的,以便另一个进程不会选择相同的InvoiceID

我可以这样做吗(对我来说看起来是原子的,但也许不是......

1:

WITH CTE AS
(
SELECT TOP(1) [InvoiceID], [Status]
FROM    INVOICES
WHERE   [Status] = 'Print'
ORDER BY [PrintRequestedDate], [InvoiceID] 
)
UPDATE CTE
SET [Status] = 'Printing'
, @InvoiceID = [InvoiceID]

。使用@InvoiceID...

UPDATE  INVOICES
SET [Status] = 'Printed'
WHERE   [InvoiceID] = @InvoiceID

或者我必须使用它(对于第一个语句)

阿拉伯数字:

UPDATE INVOICES
SET    [Status] = 'Printing'
, @InvoiceID = [InvoiceID]
WHERE  [InvoiceID] = 
(
SELECT TOP 1 [InvoiceID]
FROM    INVOICES WITH (UPDLOCK)
WHERE   [Status] = 'Print'
ORDER BY [PrintRequestedDate], [InvoiceID] 
)

。使用@InvoiceID执行操作...等。

(我无法将事务从更改状态更改为"正在打印",直到流程结束,即状态最终更改为"已打印"时)。

编辑:

万一重要,数据库是READ_COMMITTED_SNAPSHOT

我可以将更新状态为"正在打印"并获取 ID 的事务。但我不能继续保持交易打开状态,直到将状态更改为"已打印"。这是一个 SSRS 报告,它对 SQL 进行几次不同的查询以获取发票的各种位,它可能会崩溃/其他什么,使事务保持打开状态。

@Gordon Linoff"如果你想要一个队列" FIFO序列并不重要,我只想先打印首先请求的发票......"或多或少"(不想要任何不必要的复杂性...

@Martin Smith"看起来像一个普通的表作为队列要求" - 是的,正是如此,感谢您提供非常有用的链接。

溶液:

我采用的解决方案来自评论:

@lad2025向我指出了使用WITH (ROWLOCK, READPAST, UPDLOCK)的SQL Server进程队列争用条件,@MartinSmith解释了隔离问题是什么,并向我指出了将表用作队列 - 它准确地谈到了我正在尝试做的事情。

我还没有理解为什么UPDATE TOP 1是安全的,而UPDATE MyTable SET xxx = yyy WHERE MyColumn = (SELECT TOP 1 SomeColumn FROM SomeTable ORDER BY AnotherColumn)(没有隔离提示)不是,我应该教育自己,但我很高兴只是把隔离提示放在我的代码中,然后继续做其他事情:)

感谢您的所有帮助。

我担心的是重复的 [发票 ID] 同一 [发票 ID]
的多个打印请求

在第一次更新时,一行set [Status] = 'Printing'

在第二次更新时,所有 [InvoiceID] 行都会set [Status] = 'Printed'
这甚至可以设置状态 ="草稿"的行

也许这就是你想要的

另一个进程可以在set [Status] = 'Print'之前选取相同的 [发票 ID]

所以有些副本会打印,有些不会

我对使用update lock发表评论

这是不确定的,但你可以拿top (1)并跳过order by。 您往往会获得最新的行,但不能保证。 如果您清除队列,那么您将得到所有队列。

这表明您可能会丢失"草稿" = 1

declare @invID int; 
declare @T table (iden int identity primary key, invID int, status tinyint);
insert into @T values (1, 2), (5, 1), (3, 1), (4, 1), (4, 2), (2, 1), (1, 1), (5, 2), (5, 2);
declare @iden int;
select * from @t order by iden;
declare @rowcount int = 1; 
while (@ROWCOUNT > 0)
begin
update top (1) t 
set t.status = 3, @invID = t.invID,  @iden = t.iden
from @t t 
where t.status = '2';
set @rowcount = @@ROWCOUNT;
if(@rowcount > 0)
begin 
select @invID, @iden;
-- do stuff  
update t 
set t.status = 4
from @t t
where t.invID = @invID; -- t.iden = @iden;
select * from @T order by iden;
end
end

单个语句的原子性

我认为你的代码很好。 即,因为您有一个语句,一旦语句运行,状态就会更新为printing状态;因此,在搜索print之前运行的任何内容都会在您的进程看到它之前将同一记录更新为printing;因此,您的进程将选择后续记录,或者在您的语句运行后击中它的任何进程都会将其视为printing因此不会拾取它。 实际上,没有一种情况表明,记录可以在语句运行时拾取它,因为正如所讨论的那样,单个SQL语句应该是原子的。

免責聲明

也就是说,我没有足够的专家来说明显式锁定提示是否有帮助;在我看来,它们不是必需的,因为上面是原子的,但评论中的其他人可能比我更了解情况。 但是,运行测试(尽管数据库和两个线程都在同一台机器上运行)我无法创建竞争条件......也许如果客户端在不同的机器上/如果有更多的并发性,你更有可能看到问题。

我希望其他人对你的问题有不同的解释,因此存在分歧。

试图反驳自己

这是我用来尝试引起争用条件的代码;你可以把它放到LINQPad 5,选择语言C# Program,根据需要调整连接字符串(以及可选的任何语句),然后运行:

const long NoOfRecordsToTest = 1000000;
const string ConnectionString = "Server=.;Database=Play;Trusted_Connection=True;";  //assumes a database called "play"
const string DropFifoQueueTable = @"
if object_id('FIFOQueue') is not null 
drop table FIFOQueue";
const string CreateFifoQueueTable = @"
create table FIFOQueue 
(
Id bigint not null identity (1,1) primary key clustered
, Processed bit default (0) --0=queued, null=processing, 1=processed
)";
const string GenerateDummyData = @"
with cte as
(
select 1 x
union all
select x + 1
from cte
where x < @NoRowsToGenerate
)
insert FIFOQueue(processed)
select 0
from cte
option (maxrecursion 0)
";
const string GetNextFromQueue = @"
with singleRecord as
(
select top (1) Id, Processed
from FIFOQueue --with(updlock, rowlock, readpast) --optionally include this per comment discussions
where processed = 0
order by Id
)
update singleRecord
set processed = null
output inserted.Id";
//we don't really need this last bit for our demo; I've included in case the discussion turns to this..
const string MarkRecordProcessed = @"
update FIFOQueue
set Processed = 1
where Id = @Id";
void Main()
{
SetupTestDatabase();
var task1 = Task<IList<long>>.Factory.StartNew(() => ExampleTaskForced(1));
var task2 = Task<IList<long>>.Factory.StartNew(() => ExampleTaskForced(2));
Task.WaitAll(task1, task2);
foreach (var processedByBothThreads in task1.Result.Intersect(task2.Result))
{
Console.WriteLine("Both threads processed id: {0}", processedByBothThreads);
}
Console.WriteLine("done");
}
static void SetupTestDatabase()
{
RunSql<int>(new SqlCommand(DropFifoQueueTable), cmd => cmd.ExecuteNonQuery());
RunSql<int>(new SqlCommand(CreateFifoQueueTable), cmd => cmd.ExecuteNonQuery());
var generateData = new SqlCommand(GenerateDummyData);
var param = generateData.Parameters.Add("@NoRowsToGenerate",SqlDbType.BigInt);
param.Value = NoOfRecordsToTest;
RunSql<int>(generateData, cmd => cmd.ExecuteNonQuery());
}
static IList<long> ExampleTaskForced(int threadId) => new List<long>(ExampleTask(threadId)); //needed to ensure prevent lazy loadling from causing issues with our tests
static IEnumerable<long> ExampleTask(int threadId)
{
long? x;
while ((x = ProcessNextInQueue(threadId)).HasValue)
{
yield return x.Value;
}
//yield return 55; //optionally return a fake result just to prove that were there a duplicate we'd catch it
}
static long? ProcessNextInQueue(int threadId)
{
var id = RunSql<long?>(new SqlCommand(GetNextFromQueue), cmd => (long?)cmd.ExecuteScalar());
//Debug.WriteLine("Thread {0} is processing id {1}", threadId, id?.ToString() ?? "[null]"); //if you want to see how we're doing uncomment this line (commented out to improve performance / increase the likelihood of a collision
/* then if we wanted to do the second bit we could include this
if(id.HasValue) {
var markProcessed = new SqlCommand(MarkRecordProcessed);
var param = markProcessed.Parameters.Add("@Id",SqlDbType.BigInt);
param.Value = id.Value;
RunSql<int>(markProcessed, cmd => cmd.ExecuteNonQuery());
}
*/
return id;
}
static T RunSql<T>(SqlCommand command, Func<SqlCommand,T> callback)
{
try
{
using (var connection = new SqlConnection(ConnectionString))
{    
command.Connection = connection;
command.Connection.Open();
return (T)callback(command);
}
}
catch (Exception e)
{
Debug.WriteLine(e.ToString());
throw;
}
}

其他评论

上面的讨论实际上只是在谈论多个线程从队列中获取下一条记录,同时避免任何单个记录被多个线程拾取。 还有其他几点...

SQL 之外的争用条件

根据我们的讨论,如果FIFO是强制性的,还有其他事情需要担心。 即,虽然您的线程将按顺序拾取每条记录,但这取决于它们。例如Thread 1获得记录10然后Thread 2获得记录11。 现在,Thread 2Thread 1先将记录11发送到打印机,然后再发送记录10。如果它们要发送到同一台打印机,您的打印将出现故障。 如果它们是不同的打印机,这不是问题;任何打印机上的所有打印都是连续的。 我将假设后者。

异常处理

如果在正在处理某些内容的线程中发生任何异常(即线程的记录printing),则应考虑如何处理此问题。 一种选择是保持该线程重试;尽管如果这是一些根本性错误,这可能是不确定的。 另一种方法是将记录置于某种error状态,以便由另一个进程处理/接受此记录不会按顺序显示。 最后,如果队列中发票的顺序是理想的,而不是硬性要求,则可以让拥有线程将状态恢复为print,以便它或其他线程可以选取该记录重试(尽管同样,如果记录存在根本性错误,这可能会阻塞队列)。

我在这里的建议是error状态;这样你就对这个问题有更多的了解/可以专门用于处理问题的另一个过程。

碰撞处理

另一个问题是,由于对printing的更新不在事务中保留,如果服务器崩溃,您将此状态的记录保留在数据库中,并且当您的系统重新联机时,它将被忽略。 避免这种情况的方法是包含一个列,说明哪个线程正在处理它;这样,当系统恢复时,该线程可以从中断的地方恢复,或者包括一个日期戳,以便在一段时间后,任何具有状态printing"超时"的记录都可以根据需要扫描/重置为ErrorPrint状态。

WITH CTE AS
(
SELECT TOP(1) [InvoiceID], [Status], [ThreadId]
FROM    INVOICES
WHERE   [Status] = 'Print'
OR     ([Status] = 'Printing' and [ThreadId] = @ThreadId) --handle previous crash
ORDER BY [PrintRequestedDate], [InvoiceID] 
)
UPDATE CTE
SET [Status] = 'Printing'
, [ThreadId] = @ThreadId
OUTPUT Inserted.[InvoiceID]

对其他过程的认识

我们主要关注打印元素;但其他进程也可能与您的Invoices表交互。 我们可以假设,除了创建初始Draft记录并在准备好打印后将其更新为Print之外,这些过程不会触及Status字段。 但是,相同的记录可能会被完全不相关的进程锁定。 如果我们想确保 FIFO,我们不得使用ReadPast提示,因为某些记录可能具有状态Print但被锁定,因此尽管它们具有较早的PrintRequestedDate,但我们会跳过它们。 但是,如果我们希望尽快打印内容,并且在不方便的情况下将它们按顺序排列,包括ReadPast将允许我们的打印过程跳过锁定的记录并继续,一旦它们被释放,就会回来处理它们。

同样,另一个进程可能会在记录处于Printing状态时锁定我们的记录,因此我们无法更新它以将其标记为完成。 同样,如果我们想避免这种情况导致滞留,我们可以利用ThreadId列来允许我们的线程将记录保留在状态Printing,并在稍后未锁定时返回清理它。 显然,这假设ThreadId列仅由我们的打印过程使用。

具有专用的打印队列表

为了避免不相关的进程锁定发票的一些问题,请将Status字段移动到其自己的表中;因此您只需从invoices表中读取;而不需要更新它。

这也将具有以下优点:(如果您不关心打印历史记录)可以在完成后删除记录,因此您将获得更好的性能(因为您不必搜索整个发票表来查找准备打印的发票)。 也就是说,这个还有另一种选择(如果你在SQL2008或以上)。

使用筛选索引

由于 Status 列将多次更新,因此它不适合索引;即,随着状态的进展,记录在索引中的位置会从一个分支跳到另一个分支。 但是,由于我们正在对其进行过滤,因此拥有索引也会真正受益。 为了解决这个矛盾,一种选择是使用过滤索引;即只索引我们感兴趣的打印过程记录;因此,我们维护一个小指数以获得大收益。

create nonclustered index ixf_Invoices_PrintStatusAndDate  
on dbo.Invoices ([Status], [PrintRequestedDate])
include ([InvoiceId]) --just so we don't have to go off to the main table for this, but can get all we need form the index
where [Status] in ('Print','Printing') 

使用"枚举"/参考表

我怀疑您的示例使用字符串来保持演示代码简单,但是为了完整性而包括它。 在数据库中使用字符串可能会使事情难以支持。 不要让状态是字符串值,而是使用相关状态表中的 ID。

create table Statuses
(
ID smallint not null primary key clustered --not identity since we'll likely mirror this enum in code / may want to have defined ids
,Name
)   
go
insert Statuses 
values 
(0, 'Draft')
, (1,'Print')
, (2,'Printing')
, (3,'Printed')
create table Invoices 
(
--...
, StatusId smallint foreign key references Statuses(Id)
--...
)

最新更新