我需要运行10,000个管道,每个管道由5个步骤(进程)组成
另一个需求是我想并发运行大约300个。意思是,我想要300个开始,然后对于完成5个步骤的每个管道,我想要启动一个新的管道。我找不到使用通道的方法。
一些初步的想法:首先将10,000个通道划分为300个项的缓冲区。但是当一个结束的时候,它对开始一个新的没有帮助…
proteins = Channel.fromPath( '/some/path/*.fa' ).buffer( size: 300 )
process A {
input:
file query_file from proteins
output:
}
process B {
}
您可以在您的nextflow.config
中使用以下命令实现大致您想要的:
process {
maxForks = 300
}
executor {
queueSize = 300
}
maxForks指令设置可以并行执行的进程实例的最大数量。通过在nextflow中设置此值。配置,我们确保它被全面应用于您的五个进程中的每一个。如果你有其他不想被这个指令覆盖的进程,你当然可以使用一个或多个进程选择器来选择你想要限制这个配置的进程。或者,只需将该指令添加到您的五个流程定义中的每一个。
执行器
queueSize
仅定义执行器将并行处理的任务数。
这当然不能保证在开始一个新块之前完成一个由五个进程组成的块,但这通常不是什么大问题。