在下面的代码中,如果两个线程同时调用transaction()
函数,转换不同的帐户,则可能出现死锁。
void transaction(Account from, Account to, double amount)
{
mutex lock1, lock2;
lock1 = getlock(from);
lock2 = getlock(to);
acquire(lock1);
acquire(lock2);
withdraw(from, amount);
deposit(to, amount);
release(lock2);
release(lock1);
}
也就是说,一个线程可能会调用
transaction(checkingaccount, savingsaccount, 25);
另一个可能调用
transaction(savingsaccount, checkingaccount, 50);
什么是解决这个问题的好办法?
我能想到的一种方法是使用一个见证程序,它会提醒用户发生了死锁,但必须有一个更好的解决方案,可以通过修改代码来实现。有什么想法吗?
附言:这是一本关于操作系统的教科书。这不是家庭作业,只是关于死锁的章节的一部分。
这是Java Concurrency in Practice中描述的一个问题(以及解决方案),即项目110.1.2动态锁序死锁,它是专门为Java编写的,但逻辑可以很好地应用于其他上下文(如您的上下文)。
因此,由于我们无法控制参数的提供顺序,我们需要在锁上诱导一个顺序,并在编写程序的整个过程中始终根据诱导的顺序获取它们。
诱导该顺序的一种方法是计算from
和to
对象的哈希代码,并首先同步从具有较低哈希代码的对象获取锁。在(极少数)两个Account
对象具有相同的哈希代码的情况下,我们需要引入第三个锁来"打破"平局。
例如,在Java中,它将是:
int fromHash = System.identityHashCode(from);
int toHash = System.identityHashCode(to);
现在,以您的代码为参考,它可能类似于下面的代码。
Object objectForTieBreakerLock = new Object(); // a valid new object here according to your language
void transaction(Account from, Account to, double amount)
{
mutex lock1, lock2, tieBreaker;
lock1 = getlock(from);
lock2 = getlock(to);
int fromHash = /*any language specific function to get object hash*/;
int toHash = /*any language specific function to get object hash*/;
if (fromHash < toHash) {
acquire(lock1);
acquire(lock2);
doTransaction(from, to, amount);
release(lock2);
release(lock1);
}
else if (fromHash > toHash) {
acquire(lock2);
acquire(lock1);
doTransaction(from, to, amount);
release(lock1);
release(lock2);
}
else {
tieBreaker = getlock(objectForTieBreakerLock);
acquire(tieBreaker);
acquire(lock1);
acquire(lock2);
doTransaction(from, to, amount);
release(lock2);
release(lock1);
release(tieBreaker);
}
}
// this must be a private (helper) method
void doTransaction(Account from, Account to, double amount)
{
withdraw(from, amount);
deposit(to, amount);
}
附加说明
如果
Account
有一个唯一的、不可变的、可比较的密钥,比如唯一的数字、标识符或类似的东西,那么诱导锁排序会更容易:按对象的密钥排序,这样就不需要tieBreaker
锁了。完整Java代码示例:http://jcip.net/listings/InduceLockOrder.java
我在C#FeatureLoom库中通过跟踪哪个线程持有或等待哪个锁来解决这个问题。因此,我能够在锁排序死锁实际发生之前识别它,并可以通过";借款;锁。
只需将lock(lockObject){ ... }
代码块替换为using(LockOrderDeadlockResolver(lockObject)){ ... }
,就可以解决可能的死锁!
以下是解决方案的完整实现:
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
namespace FeatureLoom.Core.Synchronization
{
public static class LockOrderDeadlockResolver
{
// Maps the lockObject to the thread that it holds it.
static ConcurrentDictionary<object, Thread> holdingThreads = new ConcurrentDictionary<object, Thread>();
// ConditionalWeakTable is used to ensure that the thread is not kept alive after it finished.
// The alternative would be to remove it from the table each time it does not hold any lock and add it again when it enters a lock,
// but that would be quite costly.
static ConditionalWeakTable<Thread, ThreadLockInfo> threadLockInfos = new ConditionalWeakTable<Thread, ThreadLockInfo>();
private class ThreadLockInfo
{
public List<object> heldLocks = new List<object>();
public object waitingForLock = null;
}
/// <summary>
/// Enters a Monitor lock for the passed lockObject and returns a disposable handle to be used in a using-statement.
/// If a deadlock (caused by inverse lock order) is detected, the blocked lock is "borrowed" to finally resolve the deadlock.
/// Usage is similar to the lock() statement: using(LockOrderDeadlockResolver.Lock(lockObject)){ ... }
/// Note: Deadlocks can only be detected if all involved threads used LockOrderDeadlockResolver.Lock() instead of lock() or Monitor.Enter().
/// But beside this, LockOrderDeadlockResolver.Lock() works together with lock() and Monitor.Enter().
/// </summary>
/// <param name="lockObject">The object that is used for the Monitor.Enter(lockObject)</param>
/// <returns></returns>
public static MonitorLockHandle Lock(object lockObject)
{
Thread thread = Thread.CurrentThread;
ThreadLockInfo threadLockInfo = threadLockInfos.GetValue(thread, _ => new ThreadLockInfo() );
if (Monitor.TryEnter(lockObject))
{
// heldLocks does not have any concurrent access, because only the own thread will access it.
threadLockInfo.heldLocks.Add(lockObject);
holdingThreads[lockObject] = thread;
}
else
{
if (isDeadlock(lockObject, threadLockInfo.heldLocks))
{
// That would normally be a deadlock. But, we know that the owner of the lock is blocked until the current thread finishes,
// so we let it "borrow" the lock from the actual owner, in order to resolve the deadlock.
return new MonitorLockHandle(null);
}
threadLockInfo.waitingForLock = lockObject;
Monitor.Enter(lockObject);
threadLockInfo.waitingForLock = null;
threadLockInfo.heldLocks.Add(lockObject);
holdingThreads[lockObject] = thread;
}
return new MonitorLockHandle(lockObject);
}
private static bool isDeadlock(object lockObject, List<object> heldLocks)
{
if (heldLocks.Count == 0) return false;
if (holdingThreads.TryGetValue(lockObject, out Thread holdingThread))
{
if (threadLockInfos.TryGetValue(holdingThread, out ThreadLockInfo holdingThreadInfo) &&
holdingThreadInfo.waitingForLock != null)
{
if (heldLocks.Contains(holdingThreadInfo.waitingForLock))
{
// Current Thread A holds a lock that Thread B waits for. But Thread B holds the lock that Thread A wants to get right now. That is a deadlock.
return true;
}
else
{
// Thread B holds the lock that Thread A wants to get right now, but it waits for a lock that Thread C holds.
// We need to check if Thread C waits for the lock that Thread A holds, because that would be a deadlock, too.
// The recursion can detect a deadlock chain of any length.
return isDeadlock(holdingThreadInfo.waitingForLock, heldLocks);
}
}
}
return false;
}
private static void ReleaseLock(object lockObject)
{
Thread thread = Thread.CurrentThread;
if (threadLockInfos.TryGetValue(thread, out var threadLockInfo))
{
threadLockInfo.heldLocks.Remove(lockObject);
}
holdingThreads.TryRemove(lockObject, out _);
Monitor.Exit(lockObject);
}
// To be used in a using statement to ensure the release of the lock.
public struct MonitorLockHandle :IDisposable
{
object lockObject;
public MonitorLockHandle(object lockObject)
{
this.lockObject = lockObject;
}
public void Dispose()
{
// If no lockObject was set, the lock was "borrowed", due to a deadlock.
if (lockObject != null) ReleaseLock(lockObject);
}
}
}
}