如果以前有人问过这个问题,请原谅。我到处找了很多,但我觉得我没有合适的词汇来通过搜索网络找到这个。
我有一个python中的多线程应用程序。我希望能够锁定特定的代码块,但只能锁定到具有特定条件的其他线程。让我举一个例子:有三个线程,thread_a
、thread_b
和thread_c
。每个线程可以在任何时间运行函数foo
。我不希望bar
彼此相等的任何两个线程能够同时访问Code block ALPHA
。但是,我不想阻塞bar
值不同的线程。在这种情况下,假设thread_a
有一个bar == "cat"
,并且首先命中线(3)
。在thread_a
到达(5)
行之前,假设thread_b
,而bar == "cat"
到达(3)
行。我想让thread_b
等一下。但如果thread_c
和bar == "dog"
一起出现,我希望它能够继续下去。
(1) def foo(bar):
(2)
(3) lock(bar)
(4) # Code block ALPHA (two threads with equivalent bar should not be in here)
(5) unlock(bar)
另一方面,bar
的可能值是完全不可预测的,但碰撞的可能性非常高。
谢谢你的帮助。我正在查看的库是python线程库
更新
好消息:我能够通过我拼凑的一个有点粗糙的测试台,用我的原始答案重现你遇到的release_lock
问题,并使用计数机制(正如你所建议的)解决这个问题——至少在我的测试设备所能告诉的范围内。
现在使用了两个独立的共享字典,一个像以前一样跟踪与每个锁关联的"名称"或值,另一个跟踪在给定时间有多少线程在使用每个锁。
和以前一样,锁名称必须是可散列的值,这样它们才能用作字典中的键。
import threading
namespace_lock = threading.Lock()
namespace = {}
counters = {}
def aquire_lock(value):
with namespace_lock:
if value in namespace:
counters[value] += 1
else:
namespace[value] = threading.Lock()
counters[value] = 1
namespace[value].acquire()
def release_lock(value):
with namespace_lock:
if counters[value] == 1:
del counters[value]
lock = namespace.pop(value)
else:
counters[value] -= 1
lock = namespace[value]
lock.release()
# sample usage
def foo(bar):
aquire_lock(bar)
# Code block ALPHA (two threads with equivalent bar should not be in here)
release_lock(bar)
有一个锁,每当线程试图进入或退出关键部分时都会获取,并为bar
的每个值使用单独的条件变量。以下内容可能会被优化以创建更少的条件变量,但在这篇文章中这样做感觉像是过早的优化:
import collections
import contextlib
import threading
lock = threading.Lock()
wait_tracker = collections.defaultdict(lambda: (False, 0, threading.Condition(lock)))
@contextlib.contextmanager
def critical(bar):
with lock:
busy, waiters, condition = wait_tracker[bar]
if busy:
# Someone with the same bar value is in the critical section.
# Record that we're waiting.
waiters += 1
wait_tracker[bar] = busy, waiters, condition
# Wait for our turn.
while wait_tracker[bar][0]:
condition.wait()
# Record that we're not waiting any more.
busy, waiters, condition = wait_tracker[bar]
waiters -= 1
# Record that we're entering the critical section.
busy = True
wait_tracker[bar] = busy, waiters, condition
try:
# Critical section runs here.
yield
finally:
with lock:
# Record that we're out of the critical section.
busy, waiters, condition = wait_tracker[bar]
busy = False
if waiters:
# Someone was waiting for us. Tell them it's their turn now.
wait_tracker[bar] = busy, waiters, condition
condition.notify()
else:
# No one was waiting for us. Clean up a bit so the wait_tracker
# doesn't grow forever.
del wait_tracker[bar]
然后,每个想要进入关键部分的线程都会执行以下操作:
with critical(bar):
# Critical section.
此代码未经测试,并行性很难实现,尤其是锁和共享内存的并行性。我不能保证它会起作用。
这里有一个面向类的解决方案,适用于需要几个单独的锁组的情况。
# A dynamic group of locks, useful for parameter based locking.
class LockGroup(object):
def __init__(self):
self.lock_dict = {}
self.lock = threading.Lock()
# Returns a lock object, unique for each unique value of param.
# The first call with a given value of param creates a new lock, subsequent
# calls return the same lock.
def get_lock(self, param):
with self.lock:
if param not in self.lock_dict:
self.lock_dict[param] = threading.Lock()
return self.lock_dict[param]