我有两个相当大的文件,JSON(185,000行)和CSV(650,000)。我需要遍历 JSON 文件中的每个字典,然后在该字典中循环访问part_numbers
中的每个部分并对其进行比较,以获取该部分在 CSV 中找到的前三个字母。
出于某种原因,我很难正确地做到这一点。我的脚本的第一个版本太慢了,所以我正在尝试加快速度
JSON示例:
[
{"category": "Dryer Parts", "part_numbers": ["ABC", "DEF", "GHI", "JKL", "MNO", "PQR"], "parent_category": "Dryers"},
{"category": "Washer Parts", "part_numbers": ["ABC", "DEF", "GHI", "JKL", "MNO", "PQR"], "parent_category": "Washers"},
{"category": "Sink Parts", "part_numbers": ["ABC", "DEF", "GHI", "JKL", "MNO", "PQR"], "parent_category": "Sinks"},
{"category": "Other Parts", "part_numbers": ["ABC", "DEF", "GHI", "JKL", "MNO", "PQR"], "parent_category": "Others"}
]
CSV:
WCI|ABC
WPL|DEF
BSH|GHI
WCI|JKL
结束字典如下所示:
{"category": "Other Parts",
"part_numbers": ["WCIABC","WPLDEF","BSHGHI","JKLWCI"...]}
这是我到目前为止所做的示例,尽管它在if (part.rstrip() == row[1]):
返回IndexError: list index out of range
:
import csv
import json
from multiprocessing import Pool
def find_part(item):
data = {
'parent_category': item['parent_category'],
'category': item['category'],
'part_numbers': []
}
for part in item['part_numbers']:
for row in reader:
if (part.rstrip() == row[1]):
data['part_numbers'].append(row[0] + row[1])
with open('output.json', 'a') as outfile:
outfile.write(' ')
json.dump(data, outfile)
outfile.write(',n')
if __name__ == '__main__':
catparts = json.load(open('catparts.json', 'r'))
partfile = open('partfile.csv', 'r')
reader = csv.reader(partfile, delimiter='|')
with open('output.json', 'w+') as outfile:
outfile.write('[n')
p = Pool(50)
p.map(find_part, catparts)
with open('output.json', 'a') as outfile:
outfile.write('n]')
正如我在评论中所说,您的代码(现在)给了我一个NameError: name 'reader'
find_part()
函数中没有定义。解决方法是将csv.reader
的创建移动到函数中。我还更改了文件的打开方式,以使用with
上下文管理器和newline
参数。这也解决了一堆单独的任务都试图同时读取同一个 csv 文件的问题。
您的方法效率非常低,因为它读取item['part_numbers']
中每个部分的整个'partfile.csv'
文件。尽管如此,以下方法似乎有效:
import csv
import json
from multiprocessing import Pool
def find_part(item):
data = {
'parent_category': item['parent_category'],
'category': item['category'],
'part_numbers': []
}
for part in item['part_numbers']:
with open('partfile.csv', newline='') as partfile: # open csv in Py 3.x
for row in csv.reader(partfile, delimiter='|'):
if part.rstrip() == row[1]:
data['part_numbers'].append(row[0] + row[1])
with open('output.json', 'a') as outfile:
outfile.write(' ')
json.dump(data, outfile)
outfile.write(',n')
if __name__ == '__main__':
catparts = json.load(open('carparts.json', 'r'))
with open('output.json', 'w+') as outfile:
outfile.write('[n')
p = Pool(50)
p.map(find_part, catparts)
with open('output.json', 'a') as outfile:
outfile.write(']')
这是一个效率更高的版本,每个子进程仅读取一次整个'partfile.csv'
文件:
import csv
import json
from multiprocessing import Pool
def find_part(item):
data = {
'parent_category': item['parent_category'],
'category': item['category'],
'part_numbers': []
}
with open('partfile.csv', newline='') as partfile: # open csv for reading in Py 3.x
partlist = [row for row in csv.reader(partfile, delimiter='|')]
for part in item['part_numbers']:
part = part.rstrip()
for row in partlist:
if row[1] == part:
data['part_numbers'].append(row[0] + row[1])
with open('output.json', 'a') as outfile:
outfile.write(' ')
json.dump(data, outfile)
outfile.write(',n')
if __name__ == '__main__':
catparts = json.load(open('carparts.json', 'r'))
with open('output.json', 'w+') as outfile:
outfile.write('[n')
p = Pool(50)
p.map(find_part, catparts)
with open('output.json', 'a') as outfile:
outfile.write(']')
虽然您可以将'partfile.csv'
数据读入主任务的内存中,并将其作为参数传递给find_part()
子任务,但这样做只意味着必须为每个进程提取和取消酸洗数据。您需要运行一些计时测试来确定这是否比使用csv
模块显式读取它更快,如上所示。
另请注意,在将任务提交到Pool
之前,从'carparts.json'
文件预处理数据加载并从每行的第一部分去除尾随空格也会更有效,因为这样您就不需要一遍又一遍地find_part()
part = part.rstrip()
。同样,我不知道这样做是否值得付出努力——只有时间测试才能确定答案。
我想我找到了。 您的CSV阅读器与许多其他文件访问方法类似:按顺序读取文件,然后点击EOF。 当您尝试对第二部分执行相同操作时,文件已处于 EOF,并且第一次read
尝试返回空结果;这没有第二个要素。
如果要再次访问所有记录,则需要重置文件书签。 最简单的方法是使用
partfile.seek(0)
另一种方法是关闭并重新打开文件。
这会让你动起来吗?
只要 csv 中存在所有部件号,这应该有效。
import json
# read part codes into a dictionary
with open('partfile.csv') as fp:
partcodes = {}
for line in fp:
code, number = line.strip().split('|')
partcodes[number] = code
with open('catparts.json') as fp:
catparts = json.load(fp)
# modify the part numbers/codes
for cat in catparts:
cat['part_numbers'] = [partcodes[n] + n for n in cat['part_numbers']]
# output
with open('output.json', 'w') as fp:
json.dump(catparts, fp)