>我有相当大的csv文件,我需要逐行操作/修改(因为每一行可能需要不同的修改规则(,然后将它们写成另一个格式正确的csv。
目前,我有:
import multiprocessing
def read(buffer):
pool = multiprocessing.Pool(4)
with open("/path/to/file.csv", 'r') as f:
while True:
lines = pool.map(format_data, f.readlines(buffer))
if not lines:
break
yield lines
def format_data(row):
row = row.split(',') # Because readlines() returns a string
# Do formatting via list comprehension
return row
def main():
buf = 65535
rows = read(buf)
with open("/path/to/new.csv",'w') as out:
writer = csv.writer(f, lineterminator='n')
while rows:
try:
writer.writerows(next(rows))
except StopIteration:
break
即使我通过map
使用多处理并使用生成器防止内存过载,我仍然需要 2 分钟多来处理 40,000 行。老实说,它不应该花那么多。我什至从生成器输出生成了一个嵌套列表,并尝试一次将数据写入一个大文件,而不是逐块方法,但仍然需要很长时间。我在这里做错了什么?
我已经想通了。
首先,问题出在我的format_data()
职能上。它正在调用数据库连接,每次运行时,它都会构造数据库连接并在每次迭代时关闭它。
我通过字典为支持多线程的指数级更快的查找表创建基本映射来修复它。
所以,我的代码看起来像这样:
import multiprocessing
def read(buffer):
pool = multiprocessing.Pool(4)
with open("/path/to/file.csv", 'r') as f:
while True:
lines = pool.map(format_data, f.readlines(buffer))
if not lines:
break
yield lines
def format_data(row):
row = row.split(',') # Because readlines() returns a string
# Do formatting via list comprehension AND a dictionary lookup
# vice a database connection
return row
def main():
rows = read(1024*1024)
with open("/path/to/new.csv",'w') as out:
while rows:
try:
csv.writer(f, lineterminator='n').writerows(next(rows))
except StopIteration:
break
我能够在不到 150 秒的时间内解析一个 ~30MB 的文件。这里学到了一些经验教训,希望其他人能从中吸取教训。