死锁解决方案:锁排序



在下面的代码中,如果两个线程同时调用transaction()函数,转换不同的帐户,则可能出现死锁。

void transaction(Account from, Account to, double amount)
{
      mutex lock1, lock2;
      lock1 = getlock(from);
      lock2 = getlock(to);
      acquire(lock1);
      acquire(lock2);
         withdraw(from, amount);
         deposit(to, amount);
      release(lock2);
      release(lock1);
}

也就是说,一个线程可能会调用

transaction(checkingaccount, savingsaccount, 25);

另一个可能调用

transaction(savingsaccount, checkingaccount, 50);

什么是解决这个问题的好办法?

我能想到的一种方法是使用一个见证程序,它会提醒用户发生了死锁,但必须有一个更好的解决方案,可以通过修改代码来实现。有什么想法吗?

附言:这是一本关于操作系统的教科书。这不是家庭作业,只是关于死锁的章节的一部分。

这是Java Concurrency in Practice中描述的一个问题(以及解决方案),即项目110.1.2动态锁序死锁,它是专门为Java编写的,但逻辑可以很好地应用于其他上下文(如您的上下文)。

因此,由于我们无法控制参数的提供顺序,我们需要在锁上诱导一个顺序,并在编写程序的整个过程中始终根据诱导的顺序获取它们。

诱导该顺序的一种方法是计算fromto对象的哈希代码,并首先同步从具有较低哈希代码的对象获取锁。在(极少数)两个Account对象具有相同的哈希代码的情况下,我们需要引入第三个锁来"打破"平局。

例如,在Java中,它将是:

int fromHash = System.identityHashCode(from);
int toHash = System.identityHashCode(to);

现在,以您的代码为参考,它可能类似于下面的代码。

Object objectForTieBreakerLock = new Object(); // a valid new object here according to your language
void transaction(Account from, Account to, double amount)
{
      mutex lock1, lock2, tieBreaker;
      lock1 = getlock(from);
      lock2 = getlock(to);
      int fromHash = /*any language specific function to get object hash*/;
      int toHash = /*any language specific function to get object hash*/;
      if (fromHash < toHash) {
          acquire(lock1);
          acquire(lock2);
          doTransaction(from, to, amount);
          release(lock2);
          release(lock1);
      }
      else if (fromHash > toHash) {
          acquire(lock2);
          acquire(lock1);
          doTransaction(from, to, amount);
          release(lock1);
          release(lock2);
      }
      else {
          tieBreaker = getlock(objectForTieBreakerLock);
          acquire(tieBreaker);
          acquire(lock1);
          acquire(lock2);
          doTransaction(from, to, amount);
          release(lock2);
          release(lock1);
          release(tieBreaker);
      }
}
// this must be a private (helper) method
void doTransaction(Account from, Account to, double amount)
{
     withdraw(from, amount);
     deposit(to, amount);
}

附加说明

  • 如果Account有一个唯一的、不可变的、可比较的密钥,比如唯一的数字、标识符或类似的东西,那么诱导锁排序会更容易:按对象的密钥排序,这样就不需要tieBreaker锁了。

  • 完整Java代码示例:http://jcip.net/listings/InduceLockOrder.java

我在C#FeatureLoom库中通过跟踪哪个线程持有或等待哪个锁来解决这个问题。因此,我能够在锁排序死锁实际发生之前识别它,并可以通过";借款;锁。

只需将lock(lockObject){ ... }代码块替换为using(LockOrderDeadlockResolver(lockObject)){ ... },就可以解决可能的死锁!

以下是解决方案的完整实现:

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
namespace FeatureLoom.Core.Synchronization
{
    public static class LockOrderDeadlockResolver
    {
        // Maps the lockObject to the thread that it holds it.
        static ConcurrentDictionary<object, Thread> holdingThreads = new ConcurrentDictionary<object, Thread>();
        // ConditionalWeakTable is used to ensure that the thread is not kept alive after it finished.
        // The alternative would be to remove it from the table each time it does not hold any lock and add it again when it enters a lock,
        // but that would be quite costly.
        static ConditionalWeakTable<Thread, ThreadLockInfo> threadLockInfos = new ConditionalWeakTable<Thread, ThreadLockInfo>();
        private class ThreadLockInfo
        {            
            public List<object> heldLocks = new List<object>();
            public object waitingForLock = null;
        }
        /// <summary>
        /// Enters a Monitor lock for the passed lockObject and returns a disposable handle to be used in a using-statement.
        /// If a deadlock (caused by inverse lock order) is detected, the blocked lock is "borrowed" to finally resolve the deadlock.
        /// Usage is similar to the lock() statement: using(LockOrderDeadlockResolver.Lock(lockObject)){ ... }
        /// Note: Deadlocks can only be detected if all involved threads used LockOrderDeadlockResolver.Lock() instead of lock() or Monitor.Enter().
        /// But beside this, LockOrderDeadlockResolver.Lock() works together with lock() and Monitor.Enter().
        /// </summary>
        /// <param name="lockObject">The object that is used for the Monitor.Enter(lockObject)</param>
        /// <returns></returns>
        public static MonitorLockHandle Lock(object lockObject)
        {
            Thread thread = Thread.CurrentThread;
            ThreadLockInfo threadLockInfo = threadLockInfos.GetValue(thread, _ => new ThreadLockInfo() );            
            if (Monitor.TryEnter(lockObject))
            {                       
                // heldLocks does not have any concurrent access, because only the own thread will access it.
                threadLockInfo.heldLocks.Add(lockObject);
                holdingThreads[lockObject] = thread;
            }
            else
            {                
                if (isDeadlock(lockObject, threadLockInfo.heldLocks))
                {                    
                    // That would normally be a deadlock. But, we know that the owner of the lock is blocked until the current thread finishes,
                    // so we let it "borrow" the lock from the actual owner, in order to resolve the deadlock.
                    return new MonitorLockHandle(null);
                }
                threadLockInfo.waitingForLock = lockObject;
                Monitor.Enter(lockObject);
                threadLockInfo.waitingForLock = null;
                threadLockInfo.heldLocks.Add(lockObject);
                holdingThreads[lockObject] = thread;
            }
            return new MonitorLockHandle(lockObject);
        }
        private static bool isDeadlock(object lockObject, List<object> heldLocks)
        {
            if (heldLocks.Count == 0) return false;
            if (holdingThreads.TryGetValue(lockObject, out Thread holdingThread))
            {
                if (threadLockInfos.TryGetValue(holdingThread, out ThreadLockInfo holdingThreadInfo) &&
                    holdingThreadInfo.waitingForLock != null)
                {
                    if (heldLocks.Contains(holdingThreadInfo.waitingForLock))
                    {
                        // Current Thread A holds a lock that Thread B waits for. But Thread B holds the lock that Thread A wants to get right now. That is a deadlock.
                        return true;
                    }
                    else
                    {
                        // Thread B holds the lock that Thread A wants to get right now, but it waits for a lock that Thread C holds.
                        // We need to check if Thread C waits for the lock that Thread A holds, because that would be a deadlock, too.
                        // The recursion can detect a deadlock chain of any length.
                        return isDeadlock(holdingThreadInfo.waitingForLock, heldLocks);
                    }
                }
            }
            return false;
        }
        private static void ReleaseLock(object lockObject)
        {
            Thread thread = Thread.CurrentThread;
            if (threadLockInfos.TryGetValue(thread, out var threadLockInfo))
            {
                threadLockInfo.heldLocks.Remove(lockObject);
            }
            holdingThreads.TryRemove(lockObject, out _);
            Monitor.Exit(lockObject);
        }
        // To be used in a using statement to ensure the release of the lock.
        public struct MonitorLockHandle :IDisposable
        {
            object lockObject;
            public MonitorLockHandle(object lockObject)
            {
                this.lockObject = lockObject;
            }
            public void Dispose()
            {
                // If no lockObject was set, the lock was "borrowed", due to a deadlock.
                if (lockObject != null) ReleaseLock(lockObject);
            }
        }
    }
}

最新更新