子进程 - 线程数



run.py

def work(repo,cpuid):
my_tool_subprocess = subprocess.Popen('./scan.py {} {}'.format(repo,cpuid),shell=True, stdout=subprocess.PIPE)
line = True
while line:
myline = my_tool_subprocess.stdout.readline()
if "scan is done" in myline:
break
num = 10  # set to the number of workers you want (it defaults to the cpu count of your machine)
tp = ThreadPool(num)
cpuid=1
for repo in repos:
tp.apply_async(work, (repo[0],"1-240"))
print('Runing {} at core {}'.format(repo[0],"1-240"))
tp.close()
tp.join()

scan.py

completed = subprocess.run(['git', 'clone', repo],env=my_env)
bunch of other subprocess.run()
# at the end:
print('returncode:', completed.returncode)
print('scan is done')

我预计活动进程的数量为 10(10 个线程(,但不知何故......不是。似乎它不会等到"扫描完成",这是 scan.py 的最后一条语句,而是通过存储库列表(for 循环(从存储库列表中克隆所有存储库。重复一遍,它不会等待第 1-10 个存储库被克隆和处理(维护 10 个进程的移动窗口(,它只是去......创建其他进程和存储库克隆。

有人知道这里出了什么问题吗?

尝试像这样重构代码:

scan.py中,将所有模块级代码移动到一个函数中,例如:

def run(repo, cpuid):
# do whatever scan.py does given a repo path and cpuid
# instead of printing to stdout, have this return a value

如果您仍然关心scan.py是否也拥有命令行界面,请添加:

import argparse
def main(argv=None):
parser = argparse.ArgumentParser()
# ... implement command-line argument parsing here
args = parser.parse_args(argv)
value = run(args.repo, args.cpuid)
print(value)
if __name__ == '__main__':
main()

现在,在您的run.py中执行以下操作:

import multiprocessing
import scan  # maybe give this a more specialized name
def work(args):
repo, cpuid = args
output = scan.run(repo, cpuid)
for line in output.splitlines():
# Do whatever you want to do here...
def main():
repos = ... # You didn't show us where this comes from
pool = multiprocessing.Pool()  # Or pass however many processes
pool.map(work, [(r[0], '1-240') for r in repos])
if __name__ == '__main__':
main()

像这样的东西。 我在这里想说的一点是,如果你明智地考虑你的代码,它将使多处理变得更加简单。 然而,这里的一些细节有点固执己见。

相关内容

  • 没有找到相关文章

最新更新