清洁,pythonic方法,用于并发数据加载程序



python 3

我想知道一个真正干净的Pythonic并发数据加载器的外观。我需要这种方法,对于我的项目,该项目对数据进行了大量计算,该数据太大而无法完全适合记忆。因此,我实现了应同时运行并将数据存储在队列中的数据加载程序,以便主过程可以正常工作,而(同时)下一个数据正在加载&准备。当然,队列在空空前时应阻止它(试图消耗更多项目的主要过程 ->队列应等待新数据)或完整的数据(工作过程应等到主进程将数据消耗出排队以防止脱离 - 内存错误)。

我已经写了一堂课来使用Python的multiprocessing模块(multiprocessing.Queuemultiprocessing.Process)来满足这种需求。该类的关键部分实现如下:

import multiprocessing as mp
from itertools import cycle    
class ConcurrentLoader:
    def __init__(path_to_data, queue_size, batch_size):
        self._batch_size
        self._path = path_to_data
        filenames = ... # filenames for path 'path_to_data',
                        # get loaded using glob
        self._files = cycle()
        self._q = mp.Queue(queue_size)
        ...
        self._worker = mp.Process(target=self._worker_func, daemon=True)
        self._worker.start() # only started, never stopped
    def _worker_func(self):
        while True:
            buffer = list()
            for i in range(batch_size):
                f = next(self._files)
                ... # load f and do some pre-processing with NumPy
                ... # add it to buffer
            self._q.put(np.array(buffer).astype(np.float32))
    def get_batch_data(self):
        self._q.get()

课程还有更多方法,但它们都是为了"便利功能"。例如,它计算在每个文件加载的频率,整个数据集加载的频率等频率中,但它们在Python中很容易实现,并且不会浪费太多的计算时间(集合,dicts,.. ..。)。

另一方面,由于I/O和预处理,数据部分本身甚至可能需要几秒钟。这就是为什么我希望这种同时发生的原因。

ConcurrentLoader应该:

  • 块主进程:如果调用get_batch_data,但是队列为空
  • 块工艺过程:如果队列已满,以防止记忆外错误并防止while True浪费资源
  • 对使用ConcurrentLoader的任何类"透明":他们应该只提供数据路径并使用get_batch_data而不注意到这实际上是同时起作用的("无用的无用使用")
  • 当主要流程死亡以再次免费资源
  • 时,终止其工人

考虑这些目标(我忘记了什么?)我该怎么做才能增强当前的实施?它是螺纹/死锁安全吗?是否有更多的" Pythonic"实施方式?我可以让它更加干净吗?浪费资源以某种方式吗?

任何使用ConcurrentLoader的类都会大致遵循此设置:

class Foo:
    ...
    def do_something(self):
        ...
        data1 = ConcurrentLoader("path/to/data1", 64, 8)
        data2 = ConcurrentLoader("path/to/data2", 256, 16)
        ...
        sample1 = data1.get_batch_data()
        sample2 = data2.get_batch_data()
        ... # heavy computations with data contained in 'sample1' & 'sample2'
            # go *here*

请指出任何类型的错误,以改善我的方法或提供自己的,更清洁,更多的Pythonic方法。

  • multiprocessing.Queue为空/完整时阻止 get()/put()被称为自动发生。

  • 此行为与调用功能是透明的。

  • self._worker.start()之前使用self._worker.daemon = True,因此当主进程退出时,工人将自动杀死

相关内容

  • 没有找到相关文章

最新更新