我有一个excel 2010文件(xlsx(的列表,需要搜索特定值。由于XSLX是一种二进制格式,因此无法使用普通文本编辑器来完成。所以我为每个文件做以下操作
- 获取文件名
- 在熊猫中打开
- 将dataframe转换为numpy数组
- 检查数组是否值
这需要多处理,因为它不是i/o绑定。熊猫的东西和阵列转换需要时间。因此,我已经设置了脚本的多处理版本(请参见下文(:
问题是每个工作过程的内存消耗。尽管每个XLSX文件仅为100KB,但它在每个工作人员的2GB中连续达到峰值。我不明白为什么在处理新文件之前未发布内存。这样,我在处理文件列表之前就用完了内存。
问题似乎不是队列,而是熊猫的东西。
这是我的代码。它可以使用系统上的任何XLSX文件进行测试。
import pandas as pd
import multiprocessing as mp
import glob
path = r'c:temp'
fileFilter = 'serial.xlsx'
searchString = '804.486'
def searchFile(tasks, results, searchString):
"""Iterates over files in tasks and searches in file for the
occurence of 'searchString'.
Args:
-----
tasks: queue of strings
Files to look in
results: queue of strings
Files where the searchString was found
searchString: str
the string to be searched
"""
# for files in the queue
for task in iter(tasks.get, 'STOP'):
# read the filestructre into memory
xfile = pd.ExcelFile(task)
# iterate all sheets
for sheet in xfile.sheet_names[:3]:
# read the sheet
data = pd.read_excel(xfile, sheet)
# check if searchString is in numpy representation of dataframe
if searchString in data.values.astype(str):
# put filename in results queue
results.put(task)
break
xfile.close()
if __name__ == "__main__":
# get all files matching the filter that are in the root path
print('gathering files')
files = glob.glob(path + '**{}'.format(fileFilter), recursive=True)
# setup of queues and variables
n_proc = 2
tasks = mp.Queue()
results = mp.Queue()
print('Start processing')
# setup processes and start them
procs = [mp.Process(target=searchFile,
args=(tasks, results, searchString))
for x in range(n_proc)]
for p in procs:
p.daemon = True
p.start()
# populate queue
for file in files:
tasks.put(file)
for proc in procs:
tasks.put('STOP')
for p in procs:
p.join()
# print results
for result in range(results.qsize()):
print(results.get())
print('Done')
在GC中似乎有问题,在您永远不会离开的功能上下文中无法收集熊猫框架。您可以使用multiprocessing.Pool.map
,可以为您进行排队处理。每个项目都会要求工人功能,让GC完成工作。另外,您可以使用maxtasksperchild
池构造函数来限制工人处理的项目数量。
import glob
import multiprocessing
def searchFile(task, searchString):
xfile = pd.ExcelFile(task)
...
if found:
return task
if __name__ == '__main__':
files = glob.glob(path + '**{}'.format(fileFilter), recursive=True)
searchString = '804.486'
pool = multiprocessing.Pool(2, maxtasksperchild=10)
args = ((fname, searchString) for fname in files)
matchedFiles = filter(None, pool.map(searchFile, args))
pool.close()