基于值的线程锁



如果以前有人问过这个问题,请原谅。我到处找了很多,但我觉得我没有合适的词汇来通过搜索网络找到这个。

我有一个python中的多线程应用程序。我希望能够锁定特定的代码块,但只能锁定到具有特定条件的其他线程。让我举一个例子:有三个线程,thread_athread_bthread_c。每个线程可以在任何时间运行函数foo。我不希望bar彼此相等的任何两个线程能够同时访问Code block ALPHA。但是,我不想阻塞bar值不同的线程。在这种情况下,假设thread_a有一个bar == "cat",并且首先命中线(3)。在thread_a到达(5)行之前,假设thread_b,而bar == "cat"到达(3)行。我想让thread_b等一下。但如果thread_cbar == "dog"一起出现,我希望它能够继续下去。

(1) def foo(bar):
(2)    
(3)     lock(bar)
(4)     # Code block ALPHA (two threads with equivalent bar should not be in here)
(5)     unlock(bar)

另一方面,bar的可能值是完全不可预测的,但碰撞的可能性非常高。

谢谢你的帮助。我正在查看的库是python线程库

更新

好消息:我能够通过我拼凑的一个有点粗糙的测试台,用我的原始答案重现你遇到的release_lock问题,并使用计数机制(正如你所建议的)解决这个问题——至少在我的测试设备所能告诉的范围内。

现在使用了两个独立的共享字典,一个像以前一样跟踪与每个锁关联的"名称"或值,另一个跟踪在给定时间有多少线程在使用每个锁。

和以前一样,锁名称必须是可散列的值,这样它们才能用作字典中的键。

import threading
namespace_lock = threading.Lock()
namespace = {}
counters = {}
def aquire_lock(value):
with namespace_lock:
if value in namespace:
counters[value] += 1
else:
namespace[value] = threading.Lock()
counters[value] = 1
namespace[value].acquire()
def release_lock(value):
with namespace_lock:
if counters[value] == 1:
del counters[value]
lock = namespace.pop(value)
else:
counters[value] -= 1
lock = namespace[value]
lock.release()
# sample usage    
def foo(bar):
aquire_lock(bar)
# Code block ALPHA (two threads with equivalent bar should not be in here)
release_lock(bar)

有一个锁,每当线程试图进入或退出关键部分时都会获取,并为bar的每个值使用单独的条件变量。以下内容可能会被优化以创建更少的条件变量,但在这篇文章中这样做感觉像是过早的优化:

import collections
import contextlib
import threading
lock = threading.Lock()
wait_tracker = collections.defaultdict(lambda: (False, 0, threading.Condition(lock)))
@contextlib.contextmanager
def critical(bar):
with lock:
busy, waiters, condition = wait_tracker[bar]
if busy:
# Someone with the same bar value is in the critical section.
# Record that we're waiting.
waiters += 1
wait_tracker[bar] = busy, waiters, condition
# Wait for our turn.
while wait_tracker[bar][0]:
condition.wait()
# Record that we're not waiting any more.
busy, waiters, condition = wait_tracker[bar]
waiters -= 1
# Record that we're entering the critical section.
busy = True
wait_tracker[bar] = busy, waiters, condition
try:
# Critical section runs here.
yield
finally:
with lock:
# Record that we're out of the critical section.
busy, waiters, condition = wait_tracker[bar]
busy = False
if waiters:
# Someone was waiting for us. Tell them it's their turn now.
wait_tracker[bar] = busy, waiters, condition
condition.notify()
else:
# No one was waiting for us. Clean up a bit so the wait_tracker
# doesn't grow forever.
del wait_tracker[bar]

然后,每个想要进入关键部分的线程都会执行以下操作:

with critical(bar):
# Critical section.

此代码未经测试,并行性很难实现,尤其是锁和共享内存的并行性。我不能保证它会起作用。

这里有一个面向类的解决方案,适用于需要几个单独的锁组的情况。

# A dynamic group of locks, useful for parameter based locking.
class LockGroup(object):
def __init__(self):
self.lock_dict = {}
self.lock = threading.Lock()
# Returns a lock object, unique for each unique value of param.
# The first call with a given value of param creates a new lock, subsequent
# calls return the same lock.
def get_lock(self, param):
with self.lock:
if param not in self.lock_dict:
self.lock_dict[param] = threading.Lock()
return self.lock_dict[param]

最新更新