假设我从shell 运行了以下命令
{
samtools view -HS header.sam; # command1
samtools view input.bam 1:1-50000000; # command2
} | samtools view -bS - > output.bam # command3
对于那些不熟悉samtools视图的人(因为这是stackoverflow)。这实际上是在创建一个具有新头的新bam文件。bam文件通常是大型压缩文件,因此在某些情况下,即使通过该文件也会很耗时。一种替代方法是执行command2,然后使用samtools再热器切换标头。这两次通过大文件。上面的命令一次通过bam,这对更大的bam文件来说很好(即使在压缩-WGS时,它们也会大于20GB)。
我的问题是如何使用子流程在python中实现这种类型的命令。
我有以下内容:
fh_bam = open('output.bam', 'w')
params_0 = [ "samtools", "view", "-HS", "header.sam" ]
params_1 = [ "samtools", "view", "input.bam", "1:1-50000000"]
params_2 = [ "samtools", "view", "-bS", "-" ]
sub_0 = subprocess.Popen(params_0, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
sub_1 = subprocess.Popen(params_1, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
### SOMEHOW APPEND sub_1.stdout to sub_0.stdout
sub_2 = subprocess.Popen(params_2, stdin=appended.stdout, stdout=fh_bam)
非常感谢您的帮助。谢谢
如果字符串中已经有shell命令,则可以按原样运行:
#!/usr/bin/env python
from subprocess import check_call
check_call(r"""
{
samtools view -HS header.sam; # command1
samtools view input.bam 1:1-50000000; # command2
} | samtools view -bS - > output.bam # command3
""", shell=True)
在Python中模拟管道:
#!/usr/bin/env python
from subprocess import Popen, PIPE
# start command3 to get stdin pipe, redirect output to the file
with open('output.bam', 'wb', 0) as output_file:
command3 = Popen("samtools view -bS -".split(),
stdin=PIPE, stdout=output_file)
# start command1 with its stdout redirected to command3 stdin
command1 = Popen('samtools view -HS header.sam'.split(),
stdout=command3.stdin)
rc_command1 = command1.wait() #NOTE: command3.stdin is not closed, no SIGPIPE or a write error if command3 dies
# start command2 after command1 finishes
command2 = Popen('samtools view input.bam 1:1-50000000'.split(),
stdout=command3.stdin)
command3.stdin.close() # inform command2 if command3 dies (SIGPIPE or a write error)
rc_command2 = command2.wait()
rc_command3 = command3.wait()
Marco明确表示,这些命令会产生大量输出,大约为20GB。如果使用communicate(),它将等待进程终止,这意味着"fd"描述符将需要保存大量数据。在实践中,操作系统会在此期间将数据刷新到磁盘,除非您的计算机有超过20GB的可用RAM。因此,您最终将中间数据写入磁盘,而原作者希望避免这种情况。+1为sirlark的答案!
我认为,由于所涉及的文件的大小,在内存中连接前两个子流程的输出是不可行的。我建议将前两个子流程的输出封装在类似的文件中。看起来您只需要read方法,因为popen只会像这样从stdin文件中读取,而不会进行查找或写入。下面的代码假设从读取返回一个空字符串就足以指示流处于EOF
class concat(object):
def __init__(self, f1, f2):
self.f1 = f1
self.f2 = f2
def read(self, *args):
ret = self.f1.read(*args)
if ret == '':
ret = self.f2.read(*args)
return ret
fh_bam = open('output.bam', 'w')
params_0 = [ "samtools", "view", "-HS", "header.sam" ]
params_1 = [ "samtools", "view", "input.bam", "1:1-50000000"]
params_2 = [ "samtools", "view", "-bS", "-" ]
sub_0 = subprocess.Popen(params_0, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
sub_1 = subprocess.Popen(params_1, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
### Somehow Append sub_1.stdout to sub_0.stdout
sub_2 = subprocess.Popen(params_2, stdin=concat(sub_0.stdout, sub_1.stdout), stdout=fh_bam)
为了澄清,f1.read
将阻塞,并且仅在管道关闭/EOF'd时返回''
。concat.read
只在发生这种情况后尝试从f2
读取,因此f1
和f2
的输出不会交织在一起。当然,重复读取f1
的末尾会有一些开销,这可以通过设置一个标志变量来指示从哪个文件读取来避免。不过,我怀疑它是否能显著提高表现。
虽然Popen接受类似文件的对象,但它实际上使用底层文件句柄/描述符进行通信,而不是像@J.F.Sebastian正确指出的那样使用文件对象的读写方法。更好的方法是使用不使用磁盘的管道(os.pipe()
)。这允许您将输出流直接连接到另一个进程的输入流,这正是您想要的。那么问题就只是串行化的问题,以确保两个源流不会交错。
import os
import subprocess
r, w = os.pipe()
fh_bam = open('output.bam', 'w')
params_0 = [ "samtools", "view", "-HS", "header.sam" ]
params_1 = [ "samtools", "view", "input.bam", "1:1-50000000"]
params_2 = [ "samtools", "view", "-bS", "-" ]
sub_sink = subprocess.Popen(params_2, stdin=r, stdout=fh_bam, bufsize=4096)
sub_src1 = subprocess.Popen(params_0, stderr=subprocess.PIPE, stdout=w, bufsize=4096)
sub_src1.communicate()
sub_src2 = subprocess.Popen(params_1, stderr=subprocess.PIPE, stdout=w, bufsize=4096)
sub_src2.communicate()
我们先打开接收器(管道的读取器),然后用源进程打开communicate
,只是为了避免@Ariel提到的潜在阻塞。这也迫使第一个源进程在第二个源进程有机会写入管道之前完成并在管道上刷新其输出,从而防止交错/阻塞输出。您可以使用bufsize值来调整性能。
这几乎正是shell命令正在执行的操作。