Python:对 Python 类中的列表或字典的信号量保护



my code

class myclass:
def __init__(self):
self.x = {}
self.y = []
self.semaphore = threading.Semaphore()
def __semaphore(func):
def wrapper(**args. *kw):
args[0].__sync_semaphore.acquire()
ret = func(*args, **kw)
args[0].__sync_semaphore.release()
return ret
return wrapper
@__semaphore
def __setattr__(self, name, value):
super().__setattr__(name, value)
@__semaphore
def save_to_disk(self):
""" access to my_class.x and my_class.y """
my_class = myclass()
my_class.x['a'] = 123

使用上面的代码,我尝试使用信号量来保护我的x,并在调用save_to_disk时保护y。但是当我打电话给my_class.x['a'] = 123时,my_class.__setattr__没有被召唤。所以我的x不受保护。

我有两个问题:

  • 当我调用my_class.x['a'] = 123调用哪个 Python 函数时?
  • 如何保护我的xymy_class,而不是全球listdict;我的xy也可能有listdict

更新:我想更新上述随机代码的一些概念。 我想创建一个类似内核的AI。AI 必须同时完成 2 项工作。一个是收集我提供的所有信息。二是达到阈值时必须将信息保存到磁盘(我不希望它杀死我的RAM(

我试图做什么

  • 创建一个继承dictlistclass,以覆盖{}[],但它需要我更新所有{}[]。这不是效率。
  • 目前我正在尝试创建一个读/写信号量,然后覆盖dict().__setitem__list.append等。但我不知道会发生什么

TLDR:仅在myclass方法上执行此操作是没有用的,因为不仅涉及myclassmy_class.x['a'] = 123等效于此:

def set_x_a(obj: myclass, value):
x = obj.__getattr__('x')   # fetch `x` via `myclass` method
x.__setitem__('a', value)  # set `'a'` via `type(x)` method
set_x_a(my_class, 123)

请注意调用x.__setitem__时对my_class.__getattr__的调用是如何完成的。因此,my_class方法内部的任何同步都属于错误的范围。


您可以通过仅在同步块中授予对类字段的访问权限来保护类字段免受并发访问。

Python 同步块的基本方法是with语句,例如可以与threading锁一起使用。为了简化创建自定义块的过程,contextlib.contextmanager使用单个生成器(而不是两种方法(。最后,property允许向属性添加行为,例如同步。

import sys
import threading
from contextlib import contextmanager
class Synchronized:
def __init__(self):
self._x = {}  # actual data, stored internally
self._mutex = threading.RLock()
@property
@contextmanager
def x(self):           # public behaviour of data
with self._mutex:  # only give access when synchronised
yield self._x
def save(self, file=sys.stdout):
with self._mutex:  # only internally access when synchronised
file.write(str(self._x))

重要的更改是不再直接公开dict属性。它只能通过持有锁来使用。

synced = Synchronized()
with synced.x as x:
x['a'] = 123
x['b'] = 42
synced.save()

您可以将此模式扩展到其他属性,并改进对属性的保护。例如,您可以yieldself._x的副本或collections.ChainMap,并在块的末尾显式更新内部状态 - 从而使外部引用的效果无效。

Q1

当我调用 my_class.x['a'] = 123 时,调用哪个 Python 函数?

先呼叫def __getattribute__(self, item):

关于您的想法

我想创建一个类似内核的AI。AI 必须同时完成 2 项工作。一个是收集我提供的所有信息。二是达到阈值时必须将信息保存到磁盘(我不希望它杀死我的RAM(

问题是因为两个线程想要共享同一个变量,对吧?

如果是这样,也许您可以尝试一次只让一个线程工作,然后不必担心资源会发生变化。

例如:

import threading
import numpy as np
from time import time, sleep

def get_data(share_list, share_dict):
num_of_data = 0
while num_of_data < 6:
t_s = time()
if is_writing_flag.is_set():
sleep(REFRESH_TIME)
continue
while 1:
data = np.random.normal(1, 1, (10,))
threshold = all(data > 1.6)
if threshold:
share_list.append(data)
share_dict['time'] = time() - t_s
num_of_data += 1
is_writing_flag.set()
break
close_keeper_flag.clear()

def data_keeper(share_list, share_dict):
while close_keeper_flag.is_set():
while is_writing_flag.is_set():
# save as csv, json, yaml...
print(share_list.pop())
print(share_dict['time'])
is_writing_flag.clear()
sleep(REFRESH_TIME)

def main():
share_list = []
share_dict = {}
td_collect_data = threading.Thread(target=get_data, name='collect some data', args=[share_list, share_dict])
td_data_keeper = threading.Thread(target=data_keeper, name='save data.', args=[share_list, share_dict])
for th in (td_collect_data, td_data_keeper):
th.start()

if __name__ == '__main__':
REFRESH_TIME = 0.2
is_writing_flag = threading.Event()
is_writing_flag.clear()
close_keeper_flag = threading.Event()
close_keeper_flag.set()
main()

但是我更喜欢使用asyncio来处理这个问题,例如

import asyncio
import numpy as np
from time import time

async def take_data(num_of_data):
count = 0
t_s = time()
while 1:
if count == num_of_data:
break
data = await collect_data()
cost_time = time() - t_s
yield list(data), dict(time=cost_time)
t_s = time()
count += 1

async def collect_data():
while 1:
data = np.random.normal(1, 1, (10,))
threshold = all(data > 1.6)
if threshold:
break
return data

async def ai_process():
async for res_list, res_dict in take_data(5):
print(res_dict['time'])
# save_to_desktop()
...

def main():
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([ai_process()]))
loop.close()

if __name__ == '__main__':
main()

如果这对你仍然没有用,我会删除答案。如果您有任何疑问,请告诉我,谢谢。

最新更新