如何并行化程序以在 python 中读取和写入大文件?



我正在尝试使用 Python 从大文件中读取和写入数据 ~3 亿行和 ~200 GB。我已经能够让基本代码工作,但想并行化它,以便它运行得更快。为此,我一直遵循本指南:https://www.blopig.com/blog/2016/08/processing-large-files-using-python/。但是,当我尝试并行化代码时,我收到一个错误:"TypeError:* 后面的 worker(( 参数必须是可迭代的,而不是 int"。我怎样才能让代码运行,你对提高效率有什么建议吗?请注意,我对Python相对较新。

基本代码(其中设置了id_pct1和id_pct001(:

with open(file1) as f, open('file1', 'w') as out_f1, open('file2', 'w') as out_f001:
for line in f:
data = line.split('*')
if data[30] in id_pct1: out_f1.write(line)
if data[30] in id_pct001: out_f001.write(line)

并行代码:

def worker(lineByte):
with open(file1) as f, open('file1', 'w') as out_f1, open('file2', 'w') as out_f001:
f.seek(lineByte)
line = f.readline()
data = line.split('*')
if data[30] in id_pct1: out_f1.write(line)
if data[30] in id_pct001: out_f001.write(line)

def main():
pool = mp.Pool()
jobs = []
with open('Subsets/FirstLines.txt') as f:
nextLineByte = 0
for line in f:
jobs.append(pool.apply_async(worker,(nextLineByte)))
nextLineByte += len(line)
for job in jobs:
job.get()
pool.close()
if __name__ == '__main__':
main()

尝试使用

jobs.append(pool.apply_async(worker,(nextLineByte,)))

pool.apply_async(( 需要一个可迭代对象。

(nextLineByte( 充当 int,这是抛出的错误。

相关内容

  • 没有找到相关文章

最新更新