如何对以实体id为条件的C#互斥块进行编码



我正在寻找一种C#模式来编码同步操作,包括为特定实体写入两个不同的数据库,这样我就可以避免在同一实体上同时执行操作的竞争条件。

例如,线程1和线程2同时处理实体X上的操作。该操作将X的信息写入数据库A(在我的情况下,是对MongoDB的追加)和数据库B(对SqlServer的插入)。线程3正在实体Y上处理相同的操作。所需的行为是:

  • 线程1在处理实体X对A和B的写入时阻塞线程2
  • 线程2等待,直到线程1完成对A和B的写入,然后对实体X的A和B进行写入
  • 线程3未被阻塞,并且在线程1处理时处理对实体Y的A和B的写入

我试图避免的行为是:

  • 线程1为实体X写入A
  • 线程2为实体X写入A
  • 线程2为实体X写入B
  • 线程1为实体X写入B

我可以在所有线程中使用互斥锁,但我并不真的想阻止不同实体的操作。

我建议使用简单锁(如果它在代码的一个区域),因为它将处理不同的对象(意味着.net对象),但具有相同的值(因为它是同一个实体),所以我宁愿使用某种形式的实体代码。如果实体有某种形式的代码,我会使用它——例如:

但当然,你必须小心死锁。还有String.Intern很棘手,因为只要应用程序运行,它就会对字符串进行Intern。

lock(String.Intern(myEntity.Code))
{
SaveToDatabaseA(myEntity);
SaveToDatabaseB(myEntity);
}

但看起来你想要某种复制机制。然后我宁愿在数据库级别(而不是代码级别)上进行

[更新]

你更新了这个问题的信息,它是在多台服务器上完成的。这些信息在这里是至关重要的:)普通锁不起作用。

当然,您可以在不同的服务器上同步锁,但这与分布式事务类似。从理论上讲,你可以做到,但大多数人只是尽可能地避免,他们会利用解决方案的架构来简化流程。

[更新2]

您可能还会发现以下有趣的内容:.NET 中的分布式锁定

:)

对于多个进程1,使用lock语句是不够的。即使是命名/系统信号量也仅限于一台机器,因此来自多个服务器的信号量不足。

如果重复处理是可以的,并且可以选择一个"赢家",那么只写/更新或使用乐观并发就足够了。如果需要维护更强的进程一次性并发保证,则需要使用全局锁定机制——SQL Server通过sp_getapplock支持这种机制。

同样,可以更新模型,以便每个代理"请求"下一个工作单元,从而可以集中控制调度,并且根据ID等,一个实体一次只给一个代理进行处理。另一种选择可能是使用像RabbitMQ(或Kafka等,fsvo)这样的消息传递系统;对于RabbitMQ,甚至可以使用一致哈希来确保(在大多数情况下)不同的消费者接收到不重叠的消息。细节因所使用的实现方式而异。

由于SQL RDBMS和MongoDB的性质不同(尤其是当用作"缓存"时),放松限制和/或将MongoDB用作直通读取(这是使用缓存的好方法)就足够了。这可以缓解成对写入问题,尽管它不会阻止对相同项目的全局并发处理。

1即使锁语句在全局上是不充分的,它仍然可以在单个进程中的线程之间本地使用,以减少局部争用和/或最大限度地减少全局锁定。


下面的答案是针对原始问题的,假设是单个过程

避免通过多个线程同时处理同一对象的"标准"方法是在特定对象上使用lock语句。在对象本身上获取锁,使得lock(X)lock(Y)!ReferenceEquals(X,Y)时是独立的。

lock语句获取给定对象的互斥锁,执行语句块,然后释放锁当锁被持有时,持有锁的线程可以再次获取并释放锁任何其他线程都被阻止获取锁定,并等待锁定释放

lock (objectBeingSaved) {
// This code execution is mutually-exclusive over a specific object..
// ..and independent (non-blocking) over different objects.
Process(objectBeingSaved);
}

本地进程锁定不一定会转化为对数据库访问的充分保证,或者当访问溢出到进程时还应该考虑锁的范围:例如,它应该涵盖所有处理、仅保存还是其他工作单元?

为了控制哪些对象被锁定,并减少不希望的/意外的锁定交互的机会,有时建议为对象显式地(且仅用于)添加一个具有最特定可见性的字段,以建立锁定。如果需要考虑的话,这也可以用于对应该相互锁定的对象进行分组。

也可以使用锁定池,尽管这往往是一个更"高级"的用例,只有特定的适用性。使用池还允许使用信号量(在更具体的用例中)以及简单的锁。

如果每个外部ID需要有一个锁,一种方法是将正在处理的实体与池集成,在实体之间建立锁:

// Some lock pool. Variations of the strategy:
// - Weak-value hash table
// - Explicit acquire/release lock
// - Explicit acquire/release from ctor and finalizer (or Dispose)
var locks = CreateLockPool();
// When object is created, assign a lock object
var entity = CreateEntity();
// Returns same lock object (instance) for the given ID, and a different
// lock object (instance) for a different ID.
etity.Lock = GetLock(locks, entity.ID);
lock (entity.Lock) {
// Mutually exclusive per whatever rules are to select the lock
Process(entity);
}

另一种变体是本地化池,而不是每个实体本身携带一个锁对象。它在概念上与上面的模型相同,只是从外到内翻转。要点如下。YMMV。

private sealed class Locker { public int Count; }
IDictionary<int, Locker> _locks = new Dictionary<int, Locker>();
void WithLockOnId(int id, Action action) {
Locker locker;
lock (_locks) {
// The _locks might have lots of contention; the work
// done inside is expected to be FAST in comparison to action().
if (!_locks.TryGetValue(id, out locker)
locker = _locks[id] = new Locker();
++locker.Count;
}
lock (locker) {
// Runs mutually-exclusive by ID, as established per creation of
// distinct lock objects.
action();
}
lock (_locks) {
// Don't forget to take out the garbage..
// This would be better with try/finally, which is left as an exercise
// to the reader, along with fixing any other minor errors.
if (--_locks[id].Count == 0)
_locks.Remove(id);
}
}
// And then..
WithLockOnId(x.ID, () => Process(x));

从侧面看,另一种方法是跨线程/处理单元"分割"实体。因此,保证每个线程永远不会处理与另一个线程相同的实体:X、Y、Z总是转到#1,P、D、Q总是转到#2。(优化吞吐量有点复杂。)

var threadIndex = entity.ID % NumThreads;
QueueWorkOnThread(threadIndex, entity); // eg. add to List<ConcurrentQueue>

最新更新