我有一些复杂的A类,它计算数据(大型矩阵计算(,同时使用B类的输入数据。
A本身使用多个内核。但是,当A需要下一个数据块时,它会等待相当长的时间,因为B在同一主线程中运行。
由于A主要使用 GPU 进行计算,我希望B在 CPU 上同时收集数据。
我的最新方法是:
# every time *A* needs data
def some_computation_method(self):
data = B.get_data()
# start computations with data
。B看起来大致是这样的:
class B(object):
def __init__(self, ...):
...
self._queue = multiprocessing.Queue(10)
loader = multiprocessing.Process(target=self._concurrent_loader)
def _concurrent_loader(self):
while True:
if not self._queue.full():
# here: data loading from disk and pre-processing
# that requires access to instance variables
# like self.path, self.batch_size, ...
self._queue.put(data_chunk)
else:
# don't eat CPU time if A is too busy to consume
# the queue at the moment
time.sleep(1)
def get_data(self):
return self._queue.get()
这种方法可以被认为是一个"pythonic"解决方案吗?
由于我对 Python 的多处理模块没有太多经验,所以我构建了一个简单/简单的方法。然而,它对我来说看起来有点"笨拙"。
让B类同时从磁盘加载数据并通过某个队列提供数据,而主线程运行大量计算并不时消耗队列中的数据,这有什么更好的解决方案?
虽然您的解决方案完全没问题,尤其是对于"小型"项目,但它的缺点是线程与类B
紧密耦合。因此,如果您(例如(出于某种原因想以非线程方式使用B
,那么您就不走运了。
我个人会以线程安全的方式编写类,然后使用外部线程调用它:
class B(object):
def __init__(self):
self._queue = multiprocessing.Queue(10)
...
if __name__ == '__main__':
b = B()
loader = multiprocessing.Process(target=b._concurrent_loader)
loader.start()
这使得B
更灵活,更好地分离依赖项,并且更易于测试。与在类创建时隐式发生相比,它还通过显式创建线程使代码更具可读性。