我需要编写一些文件,这些文件从两个不同且非常大的列表获取输入。以下python代码有效,但由于列表的大小和涉及的其他变量需要很长时间才能运行:
for n,seq in enumerate(ugFA):
with open("locusFASTAs/"+loci[n], 'a') as outFA:
SeqIO.write(ugSeqs[seq.id], outFA, 'fasta')
for m,i in enumerate(wantedContigs):
if f[m].id==seq.id:
SeqIO.write(MergeSeqs[i], outFA, 'fasta')
else: continue
上述代码中的数据结构:
- ugFA是一个列表
- 位点是一个列表
- ugSeqs是一本字典
- 通缉重叠群是一个列表
- f 是一个列表
- MergeSeqs是一个字典
我试图使用 multiprocessing
并行化代码。以下代码完成了这项工作,但 (i( 运行速度没有更快,(ii( 似乎没有超过 100% 的 CPU,以及 (iii( 完成后吐出如下所示的错误消息,即使它完成了循环中的任务:
def extractContigs(ugFA, loci, ugSeqs, wantedContigs, f, MergeSeqs):
from Bio import SeqIO
for n,seq in enumerate(ugFA):
with open("locusFASTAs/"+loci[n], 'a') as outFA:
SeqIO.write(ugSeqs[seq.id], outFA, 'fasta')
for m,i in enumerate(wantedContigs):
if f[m].id==seq.id:
SeqIO.write(MergeSeqs[i], outFA, 'fasta')
else: continue
pool = multiprocessing.Pool(processes=p)
r = pool.map(extractContigs(ugFA, loci, ugSeqs, wantedContigs, MergeSeqs))
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
TypeError: map() takes at least 3 arguments (2 given)
我在代码构建过程中做错了什么吗?我可以正确构建它以充分利用multiprocessing
模块的权宜之计吗?
问题是
r = pool.map(extractContigs(ugFA, loci, ugSeqs, wantedContigs, MergeSeqs))
调用函数extractContigs
(在主线程中,因此是 100% CPU(,然后将结果作为参数传递给 pool.map
。 正确的签名是
pool.map(func, iterable)
要使其在您的情况下起作用,您需要重写 extractContigs
函数以仅接受一个参数。 从外观上看,您需要对代码进行大量重构才能执行此操作。 同时写入同一文件可能是一个问题。
可以适当修改以前的版本:
def writeLocus(n):
seq = ugFa[n]
with open("locusFASTAs/"+loci[n], 'a') as outFA:
SeqIO.write(ugSeqs[seq.id], outFA, 'fasta')
for m,i in enumerate(wantedContigs):
if f[m].id==seq.id:
SeqIO.write(MergeSeqs[i], outFA, 'fasta')
else: continue
pool.map(writeLocus, range(len(ugFa)))
请确保验证输出中的内容不会由于并行写入相同的文件而出现乱码。 理想情况下,最好让每个工作线程写入自己的文件,然后合并。