是否有一个同步锁与关键在Python?



我需要一个Lock对象,类似于multiprocessing.Manager().Lock(),只允许从实际获得它的进程中释放。

我的手动实现将类似于以下内容:

class KeyLock:
def __init__(self):
self._lock = Lock()
self._key: Optional[str] = None
def acquire(self, key: 'str', blocking: bool = True, timeout: float = 10.0) -> bool:
if self._lock.acquire(blocking=blocking, timeout=timeout):
self._key = key
return True
return False
def release(self, key, raise_error: bool = False) -> bool:
if self._key == key:
self._lock.release()
return True
if raise_error:
raise RuntimeError(
'KeyLock.released called with a non matchin key!'
)
return False
def locked(self):
return self._lock.locked()

要创建此锁的实例并在多个进程中使用它,我将使用自定义管理器类:

class KeyLockManager(BaseManager):
pass

KeyLockManager.register('KeyLock', KeyLock)
manager = KeyLockManager()
manager.start()
lock = manager.KeyLock()

从不同的进程,我可以做:

lock.acquire(os.getpid())
# use shared ressource
...
lock.release(os.getpid())

按预期工作,但对于一个相对简单的任务来说,这似乎是一个相当大的努力。所以我想知道是否有一个更简单的方法来做到这一点?

根据定义,multiprocessing.RLock只能由获取它的进程释放。或者您可以考虑如下的情况,其中Lock实例是封装的,并且意味着仅用作上下文管理器,因此除非您已经获得它,否则不可能释放它,除非您违反封装。当然,可以向类中添加额外的保护,以保护违反封装并访问Lock实例本身的企图。当然,在您的实现中,也可能总是违反封装,因为您可以获得其他进程的pid。所以我们假设所有的用户都遵守规则。

from multiprocessing import Lock
class KeyLock:
def __init__(self):
self.__lock = Lock()
def __enter__(self):
self.__lock.acquire()
return None
def __exit__(self, exc_type, exc_val, exc_tb):
self.__lock.release()
return False
# Usage:
key_lock = KeyLock()
with key_lock:
# do something
...

相关内容

  • 没有找到相关文章

最新更新