Python多处理工作者记忆消耗无限地增加



我有一个excel 2010文件(xlsx(的列表,需要搜索特定值。由于XSLX是一种二进制格式,因此无法使用普通文本编辑器来完成。所以我为每个文件做以下操作

  1. 获取文件名
  2. 在熊猫中打开
  3. 将dataframe转换为numpy数组
  4. 检查数组是否值

这需要多处理,因为它不是i/o绑定。熊猫的东西和阵列转换需要时间。因此,我已经设置了脚本的多处理版本(请参见下文(:

问题是每个工作过程的内存消耗。尽管每个XLSX文件仅为100KB,但它在每个工作人员的2GB中连续达到峰值。我不明白为什么在处理新文件之前未发布内存。这样,我在处理文件列表之前就用完了内存。

问题似乎不是队列,而是熊猫的东西。

这是我的代码。它可以使用系统上的任何XLSX文件进行测试。

import pandas as pd
import multiprocessing as mp
import glob
path = r'c:temp'
fileFilter = 'serial.xlsx'
searchString = '804.486'

def searchFile(tasks, results, searchString):
    """Iterates over files in tasks and searches in file for the
    occurence of 'searchString'.
    Args:
    -----
    tasks: queue of strings
        Files to look in
    results: queue of strings
        Files where the searchString was found
    searchString: str
        the string to be searched
    """
    # for files in the queue
    for task in iter(tasks.get, 'STOP'):
        # read the filestructre into memory
        xfile = pd.ExcelFile(task)
        # iterate all sheets
        for sheet in xfile.sheet_names[:3]:
            # read the sheet
            data = pd.read_excel(xfile, sheet)
            # check if searchString is in numpy representation of dataframe
            if searchString in data.values.astype(str):
                # put filename in results queue
                results.put(task)
                break
        xfile.close()
if __name__ == "__main__":
    # get all files matching the filter that are in the root path
    print('gathering files')
    files = glob.glob(path + '**{}'.format(fileFilter), recursive=True)
    # setup of queues and variables
    n_proc = 2
    tasks = mp.Queue()
    results = mp.Queue()
    print('Start processing')
    # setup processes and start them
    procs = [mp.Process(target=searchFile,
                        args=(tasks, results, searchString))
             for x in range(n_proc)]
    for p in procs:
        p.daemon = True
        p.start()
    # populate queue
    for file in files:
        tasks.put(file)
    for proc in procs:
        tasks.put('STOP')
    for p in procs:
        p.join()
    # print results
    for result in range(results.qsize()):
        print(results.get())
    print('Done')

在GC中似乎有问题,在您永远不会离开的功能上下文中无法收集熊猫框架。您可以使用multiprocessing.Pool.map,可以为您进行排队处理。每个项目都会要求工人功能,让GC完成工作。另外,您可以使用maxtasksperchild池构造函数来限制工人处理的项目数量。

import glob
import multiprocessing

def searchFile(task, searchString):
    xfile = pd.ExcelFile(task)
    ...
    if found:
        return task

if __name__ == '__main__':
    files = glob.glob(path + '**{}'.format(fileFilter), recursive=True)
    searchString = '804.486'
    pool = multiprocessing.Pool(2, maxtasksperchild=10)
    args = ((fname, searchString) for fname in files)
    matchedFiles = filter(None, pool.map(searchFile, args))
    pool.close()

最新更新