对不起,如果这是另一个问题的重复,但我读过其他试图使用多处理的威胁,我不得不说这只会使我更加困惑(我一位试图处理服务器中大量数据和文件的生物学家,我对适当的语言并不熟悉。我的不好!(
。我基本上想要的是同时在脚本内运行一个循环5次,以便我可以利用服务器中有几个CPU的事实。如果我没有不同的参数组合作为此脚本的输入,这将很简单。脚本通过文件夹中的文件(在实验中的不同示例(循环,根据这些文件的名称创建输出名称,并修改我提交给OS.System的字符串以运行程序。在我的程序呼叫中,我还需要为每个样本指定一个不同的参考文件,然后通过在脚本中构建字典来做到这一点。
我这样称呼我的脚本:
run_ProgramXPTO.py list.txt
where list.txt我有类似的东西,它指定了每个示例文件的参考文件的路径。假设我有5个样本,所以我有:
sampleA /path/to/reference/lion.reference
sampleB /path/to/reference/cat.reference
sampleC /path/to/reference/tiger.reference
sampleD /path/to/reference/cow.reference
sampleE /path/to/reference/dog.reference
然后,在此脚本中,我为示例名称添加了必要的扩展名,创建一个输出名称并设置一个带有参考路径的参数。我对这个程序的呼吁是:
do_this_for_me -input sampleA_call.vcf.gz -reference /path/to/reference/lion.reference -output sampleA_call.stats
我试图使用多处理以同时运行5次,但是发生的事情是,相同的输入文件正在运行5次,而不是使用不同输入文件运行的5次程序。因此,我做错了什么,不明白如何使用搜索网络的多处理...
所以,这就是我在run_programxpto.py中所拥有的东西:
import sys
import os
import glob
import multiprocessing
#this reads a file with paths to references
list=sys.argv[1]
#this makes a dictionary from the input file where for each sample
#I now have a path to another file (reference) in my system
def make_PathDir(list):
list=open(list,"r")
mydir={}
for line in list:
row=line.strip().split('t')
key=row[0]
value=row[1]
mydir.setdefault(key,value)
return mydir
#call the program specifying, for each input, an output name
#and the path to reference file
def worker(x):
for i in x:
name1=i.strip("./")
name2=name1.strip("_call.vcf.gz")
output=str(name2+"_call.stats")
path=PathDir.get(name2)
command="bcftools stats -F %s -s - %s > %s" % (path, name1, output)
os.system(command)
return
PathDir=make_PathDir(list)
#and here, run my program 5 times for each input file
if __name__ == '__main__':
jobs = []
for i in range(5):
f=glob.glob("./*_call.vcf.gz")
p = multiprocessing.Process(target=worker,args=[f])
jobs.append(p)
p.start()
非常感谢。
python 3.2 解决方案(我错过了python 2.7标签(。如果必须是Python 2,我们可以对此进行修改。同时,这应该给您这个想法。它用更轻松,更多的pythonic做方法来代替您的某些代码。
#!/usr/bin/env python3
import sys
import os
import glob
import argparse
import functools
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor as PoolExecutor
NUM_CONCURRENT_WORKERS = 5
def process_sample(sample_to_reference_map, input_filename):
"""Run bcftools stats on input_filename using the correct reference file"""
sample_basename = input_filename.rstrip('_call.vcf.gz')
output_filename = '{}_call.stats'.format(sample_basename)
reference_filename = sample_to_reference_map[sample_basename]
command = 'bcftools stats -F {} -s - {} > {}'.format(
reference_filename,
input_filename,
output_filename)
os.system(command)
def process_args():
parser = argparse.ArgumentParser(prog=sys.argv[0])
parser.add_argument('sample_map')
return parser.parse_args()
def main():
args = process_args()
# Read sample to reference mapping
with open(args.sample_map) as f:
sample_to_reference_map = dict(line.strip().split() for line in f)
# Create a worker function that has the map passed to it
worker = functools.partial(process_sample, sample_to_reference_map)
# Use a pool of workers to process samples
with PoolExecutor(max_workers=NUM_CONCURRENT_WORKERS) as executor:
# Get a list of sample files to process
input_files = glob.glob('*_call.vcf.gz')
# Queue a background job for each file, and keep a job-to-sample
# map for status
future_to_sample = {executor.submit(worker, f): f for f in input_files}
# Print messages for each as they finish
for future in concurrent.futures.as_completed(future_to_sample):
print('{} completed'.format(future_to_sample[future]))
if __name__ == '__main__':
main()