依次执行锁(使用ConcurrentQueue和lock)



我在API中有一个端点,它处理数据并使用锁。由于有许多用户使用系统,因此有时请求没有按顺序处理,并且如果有许多请求等待,可能会抛出超时。我们已经尝试了两种不同的方法,第一种是这个类:

public class QueuedFunctions<T>
{
private readonly object _internalSyncronizer = new object();
private readonly ConcurrentQueue<Func<T>> _funcQueue = new ConcurrentQueue<Func<T>>();
public T Execute(Func<T> func)
{
_funcQueue.Enqueue(func);
Console.WriteLine("Queuing: " + Thread.CurrentThread.ManagedThreadId);
Console.WriteLine(Monitor.IsEntered(_internalSyncronizer));
lock (_internalSyncronizer)
{
Console.WriteLine("Locked: " + Thread.CurrentThread.ManagedThreadId);
Func<T> nextFunc;
if (_funcQueue.TryDequeue(out nextFunc))
{
return nextFunc();
}
else
{
throw new Exception("Something is wrong. How come there is nothing in the queue?");
}
}
}
}

我用下面的单元测试测试了它:

var threadsQuantity = 50;
for (int i = 0; i < threadsQuantity; i++)
{
var t = new Thread(async () =>
{
Console.WriteLine("Started Thread: " + Thread.CurrentThread.ManagedThreadId);
await queuedActions.Execute(() =>
{
Console.WriteLine("Thread: " + Thread.CurrentThread.ManagedThreadId);
return Task.FromResult<object>(null);
});
});
t.Start();
}

输出如下

Started Thread: 16
Started Thread: 13
Started Thread: 15
Started Thread: 17
Started Thread: 14
Started Thread: 18
Started Thread: 19
Started Thread: 20
Started Thread: 21
Queuing: 21
Locked: 21
Queuing: 13
Queuing: 18
Queuing: 15
Queuing: 16
Queuing: 17
Queuing: 14
Queuing: 19
Queuing: 20
Started Thread: 22
Queuing: 22
Thread: 21
Locked: 13
Thread: 13
Locked: 15
Started Thread: 23
Thread: 15
Queuing: 23
Locked: 23
Thread: 23
Locked: 18
Thread: 18
Locked: 17
Thread: 17
Locked: 16
Thread: 16
Locked: 14
Thread: 14
Locked: 19
Thread: 19
Locked: 20
Thread: 20
Locked: 22
Thread: 22
Started Thread: 24
Queuing: 24
Locked: 24
Thread: 24
Started Thread: 25
Queuing: 25
Locked: 25
Thread: 25
Started Thread: 26
Queuing: 26
Locked: 26
Thread: 26
Started Thread: 27
Queuing: 27
Locked: 27
Thread: 27
Started Thread: 28
Queuing: 28
Locked: 28
Thread: 28
Started Thread: 29
Queuing: 29
Locked: 29
Thread: 29
Started Thread: 30
Queuing: 30
Locked: 30
Thread: 30
Started Thread: 31
Queuing: 31
Locked: 31
Thread: 31
Started Thread: 32
Queuing: 32
Locked: 32
Thread: 32
Started Thread: 33
Queuing: 33
Locked: 33
Thread: 33
Started Thread: 34
Queuing: 34
Locked: 34
Thread: 34
Started Thread: 35
Queuing: 35
Locked: 35
Thread: 35
Started Thread: 36
Queuing: 36
Locked: 36
Thread: 36
Started Thread: 37
Queuing: 37
Locked: 37
Thread: 37
Started Thread: 38
Queuing: 38
Locked: 38
Thread: 38
Started Thread: 39
Queuing: 39
Locked: 39
Thread: 39
Started Thread: 40
Queuing: 40
Locked: 40
Thread: 40
Started Thread: 41
Queuing: 41
Locked: 41
Thread: 41
Started Thread: 42
Queuing: 42
Locked: 42
Thread: 42
Started Thread: 43
Queuing: 43
Locked: 43
Thread: 43
Started Thread: 44
Queuing: 44
Locked: 44
Thread: 44
Started Thread: 45
Queuing: 45
Locked: 45
Thread: 45
Started Thread: 46
Queuing: 46
Locked: 46
Thread: 46
Started Thread: 47
Queuing: 47
Locked: 47
Thread: 47
Started Thread: 48
Queuing: 48
Locked: 48
Thread: 48
Started Thread: 49
Queuing: 49
Locked: 49
Thread: 49
Started Thread: 50
Queuing: 50
Locked: 50
Thread: 50
Started Thread: 51
Queuing: 51
Locked: 51
Thread: 51
Started Thread: 52
Queuing: 52
Locked: 52
Thread: 52
Started Thread: 53
Queuing: 53
Locked: 53
Thread: 53
Started Thread: 54
Queuing: 54
Locked: 54
Thread: 54
Started Thread: 55
Queuing: 55
Locked: 55
Thread: 55
Started Thread: 56
Queuing: 56
Locked: 56
Thread: 56
Started Thread: 57
Queuing: 57
Locked: 57
Thread: 57
Started Thread: 58
Queuing: 58
Locked: 58
Thread: 58
Started Thread: 59
Queuing: 59
Locked: 59
Thread: 59
Started Thread: 60
Queuing: 60
Locked: 60
Thread: 60
Started Thread: 61
Queuing: 61
Locked: 61
Thread: 61
Started Thread: 62
Queuing: 62
Locked: 62
Thread: 62

可以看到,代码将线程21排队,锁定线程21,然后将线程13排队,锁定线程13,将线程18排队,然后锁定线程15,这是下一个要处理的线程。

此外,我们还测试了我们在这个论坛上找到的另一个类:

public sealed class QueuedLock
{
private class SyncObject : IDisposable
{
private Action m_action = null;
private static readonly object locker = new object();
public SyncObject(Action action)
{
m_action = action;
}
public void Dispose()
{
lock (locker)
{
var action = m_action;
m_action = null;
action?.Invoke();
}
}
}
private static readonly object m_innerLock = new Object();
private volatile int m_ticketsCount = 0;
private volatile int m_ticketToRide = 1;
public bool Enter()
{
if (Monitor.IsEntered(m_innerLock))
{
return false;
}
var myTicket = Interlocked.Increment(ref m_ticketsCount);
Monitor.Enter(m_innerLock);
Console.WriteLine("Locked: " + Thread.CurrentThread.ManagedThreadId);
while (true)
{
if (myTicket == m_ticketToRide)
{
return true;
}
Monitor.Wait(m_innerLock);
}
}
public void Exit()
{
Interlocked.Increment(ref m_ticketToRide);
Monitor.PulseAll(m_innerLock);
Monitor.Exit(m_innerLock);
}
public IDisposable GetLock()
{
if (Enter())
{
return new SyncObject(Exit);
}
return new SyncObject(null);
}
}

试验方法:

static QueuedLock queuedLock = new QueuedLock();
[TestMethod]
public void QueuedLockTest_ShouldRunInOrder()
{
var threadsQuantity = 50;
for (int i = 0; i < threadsQuantity; i++)
{
var t = new Thread(() =>
{
Console.WriteLine("Started Thread: " + Thread.CurrentThread.ManagedThreadId);
try
{
queuedLock.Enter();
Console.WriteLine("Thread: " + Thread.CurrentThread.ManagedThreadId);
}
finally
{
queuedLock.Exit();
}
});
t.Start();
}
}

并且输出也不是有序的:

Started Thread: 15
Started Thread: 14
Started Thread: 13
Started Thread: 16
Started Thread: 17
Started Thread: 18
Locked: 18
Started Thread: 19
Thread: 18
Locked: 15
Thread: 15
Locked: 14
Locked: 13
Locked: 19
Thread: 19
Locked: 16
Thread: 14
Locked: 17
Thread: 13
Thread: 16
Thread: 17
Started Thread: 20
Locked: 20
Thread: 20
Started Thread: 21
Locked: 21
Thread: 21
Started Thread: 22
Locked: 22
Thread: 22
Started Thread: 23
Locked: 23
Thread: 23
Started Thread: 24
Locked: 24
Thread: 24
Started Thread: 25
Locked: 25
Thread: 25
Started Thread: 26
Locked: 26
Thread: 26
Started Thread: 27
Locked: 27
Thread: 27
Started Thread: 28
Locked: 28
Thread: 28
Started Thread: 29
Locked: 29
Thread: 29
Started Thread: 30
Locked: 30
Thread: 30
Started Thread: 31
Locked: 31
Thread: 31
Started Thread: 32
Locked: 32
Thread: 32
Started Thread: 33
Locked: 33
Thread: 33
Started Thread: 34
Locked: 34
Thread: 34
Started Thread: 35
Locked: 35
Thread: 35
Started Thread: 36
Locked: 36
Thread: 36
Started Thread: 37
Locked: 37
Thread: 37
Started Thread: 38
Locked: 38
Thread: 38
Started Thread: 39
Locked: 39
Thread: 39
Started Thread: 40
Locked: 40
Thread: 40
Started Thread: 41
Locked: 41
Thread: 41
Started Thread: 42
Locked: 42
Thread: 42
Started Thread: 43
Locked: 43
Thread: 43
Started Thread: 44
Locked: 44
Thread: 44
Started Thread: 45
Locked: 45
Thread: 45
Started Thread: 46
Locked: 46
Thread: 46
Started Thread: 47
Locked: 47
Thread: 47
Started Thread: 48
Locked: 48
Thread: 48
Started Thread: 49
Locked: 49
Thread: 49
Started Thread: 50
Locked: 50
Thread: 50
Started Thread: 51
Locked: 51
Thread: 51
Started Thread: 52
Locked: 52
Thread: 52
Started Thread: 53
Locked: 53
Thread: 53
Started Thread: 54
Locked: 54
Thread: 54
Started Thread: 55
Locked: 55
Thread: 55
Started Thread: 56
Locked: 56
Thread: 56
Started Thread: 57
Locked: 57
Thread: 57
Started Thread: 58
Locked: 58
Thread: 58
Started Thread: 59
Locked: 59
Thread: 59
Started Thread: 60
Locked: 60
Thread: 60
Started Thread: 61
Locked: 61
Thread: 61
Started Thread: 62
Locked: 62
Thread: 62

你能帮我使它工作吗?我无法发现这些函数中的错误,而且它似乎也适用于其他人。

似乎您的测试没有生成可用的信息来证明或反驳正确的执行顺序。Thread.CurrentThread.ManagedThreadId只是一个ID,而不是一个可靠的订单指示。另外,在程序运行时写入Console可能会影响执行顺序,因为对静态Console类的所有访问都是同步的。

下面的测试使用自定义QueuedLock类(修改)来确保执行顺序,并一致地报告FIFO行为。我在启动每个线程之前添加了一个Thread.Sleep(20),以避免两个线程几乎同时请求一个票据。在ThreadStart委托中也有一个Thread.Sleep(40),以确保所有线程在进入QueuedLock之前都会被阻塞(除了每个批处理的第一个线程)。

public static void Main()
{
QueuedLock queuedLock = new QueuedLock();
foreach (var batch in Enumerable.Range(1, 10))
{
var sequence = Enumerable.Range(1, 10);
var tickets = new List<int>();
var threads = sequence.Select(item =>
{
Thread.Sleep(20);
var thread = new Thread(() =>
{
queuedLock.Enter();
try { tickets.Add(item); Thread.Sleep(40); }
finally { queuedLock.Exit(); }
});
thread.Start();
return thread;
}).ToArray();
foreach (var thread in threads) thread.Join();
Console.WriteLine($"Batch #{batch}, tickets: {String.Join(", ", tickets)} - "
+ (tickets.SequenceEqual(sequence) ? "OK" : "NOT FIFO!"));
}
}
public class QueuedLock
{
private readonly object _locker = new object();
private int _ticketsCount = 0;
private int _ticketToRide = 1;
public void Enter()
{
if (Monitor.IsEntered(_locker)) throw new InvalidOperationException();
int myTicket = Interlocked.Increment(ref _ticketsCount);
Monitor.Enter(_locker);
while (myTicket != _ticketToRide) Monitor.Wait(_locker);
}
public void Exit()
{
if (!Monitor.IsEntered(_locker)) throw new InvalidOperationException();
_ticketToRide++;
Monitor.PulseAll(_locker);
Monitor.Exit(_locker);
}
}

输出:

Batch #1, tickets: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 - OK
Batch #2, tickets: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 - OK
Batch #3, tickets: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 - OK
Batch #4, tickets: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 - OK
Batch #5, tickets: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 - OK
Batch #6, tickets: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 - OK
Batch #7, tickets: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 - OK
Batch #8, tickets: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 - OK
Batch #9, tickets: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 - OK
Batch #10, tickets: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 - OK

在小提琴上试试

最新更新