Snakemake:一种具有分批输入和相应输出的规则



我有以下工作流的基本结构:

  • 文件从远程服务器下载
  • 本地转换,然后
  • 分析

其中一个分析很耗时,但如果一次在多个输入文件上运行,则可以很好地扩展。只要文件共享同一组设置,该规则的输出就与作为批处理一起分析的文件无关。上游和下游规则对单个文件进行操作,因此从工作流的角度来看,此规则是一个异类。可以提前告知要一起运行的文件,尽管理想情况下,如果在运行过程中未能产生一些输入,则应在减少的文件上运行该规则。

下面的例子说明了这个问题:

samples = [ 'a', 'b', 'c', 'd', 'e', 'f' ]
groups = {
'A': samples[0:3],
'B': samples[3:6]
}
rule all:
input:
expand("done/{sample}.txt", sample = samples)
rule create:
output:
"created/{sample}.txt"
shell:
"echo {wildcards.sample} > {output}"
rule analyze:
input:
"created/{sample}.txt"
output:
"analyzed/{sample}.txt"
params:
outdir = "analyzed/"
shell:
"""
sleep 1 # or longer
parallel md5sum {{}} > {params.outdir}/{{/}} ::: {input}
"""
rule finalize:
input:
"analyzed/{sample}.txt"
output:
"done/{sample}.txt"
shell:
"touch {output}"

规则analyze是根据groups中的分配从多个输入产生多个输出文件的规则。规则createfinalize分别对上游和下游的各个文件进行操作。

有没有办法实现这样的逻辑?我尽量避免为了适应这种不规则性而拆分工作流程。

注意:这个问题与这里听起来类似的问题无关。

如果我理解正确。规则analyze接收组A的输入文件created/a.txt, created/b.txt, created/c.txt并给出输出analyzed/a.txt, analyzed/b.txt, analyzed/c.txt。B组也是如此,所以规则analyze运行两次,其他规则运行6次。

如果是这样,我使规则analyze输出一个伪文件,表明组a(或B等(中的文件已被分析。下游规则将接受该伪文件的输入,并将找到相应的可用analyzed/{sample}.txt

这是你的例子:

samples = [ 'a', 'b', 'c', 'd', 'e', 'f' ]
groups = {
'A': samples[0:3],
'B': samples[3:6]
}
# Map samples to groups by inverting dict groups
inv_groups = {}
for x in samples:
for k in groups:
if x in groups[k]:
inv_groups[x] = k
rule all:
input:
expand("done/{sample}.txt", sample = samples)
rule create:
output:
"created/{sample}.txt"
shell:
"echo {wildcards.sample} > {output}"
rule analyze:
input:
# Collect input for this group (A, B, etc)
grp= lambda wc: ["created/%s.txt" % x for x in groups[wc.group]]
output:
done = touch('created/{group}.done'),
shell:
"""
# Code that actually does the job...
for x in {input.grp}
do
sn=`basename $x .txt`
touch analyzed/$sn.txt
done
"""
rule finalize:
input:
# Get dummy file for this {sample}. 
# If the dummy exists also the corresponding analyzed/{sample}.txt exists.
done = lambda wc: 'created/%s.done' % inv_groups[wc.sample],
output:
fout= "done/{sample}.txt"
params:
fin= "analyzed/{sample}.txt",
shell:
"cp {params.fin} {output.fout}"

最新更新