子进程或多线程或线程池



我将使用Python并行运行命令行实用程序多次
我知道multithreading更适合用于I/O操作,multiprocessing更适合用于面向CPU的操作。

但是对于并行subprocess.run,我应该使用什么
我也知道我可以从subprocess模块创建一个池,但它与多处理和线程模块的池有何不同?为什么我不应该把subprocess.run函数放到多处理或线程池中呢?

或者,当将实用程序运行的cmd放入线程或进程池中更好时,可能有一些标准?

(在我的情况下,我将运行"ffmpeg"实用程序(

在这种情况下,我倾向于从ThreadPoolExecutor运行子流程,主要是因为这很容易。

示例(从这里开始(:

from datetime import datetime
from functools import partial
import argparse
import concurrent.futures as cf
import logging
import os
import subprocess as sp
import sys
__version__ = "2021.09.19"

def main():
"""
Entry point for dicom2jpg.
"""
args = setup()
if not args.fn:
logging.error("no files to process")
sys.exit(1)
if args.quality != 80:
logging.info(f"quality set to {args.quality}")
if args.level:
logging.info("applying level correction.")
convert_partial = partial(convert, quality=args.quality, level=args.level)
starttime = str(datetime.now())[:-7]
logging.info(f"started at {starttime}.")
with cf.ThreadPoolExecutor(max_workers=os.cpu_count()) as tp:
for infn, outfn, rv in tp.map(convert_partial, args.fn):
logging.info(f"finished conversion of {infn} to {outfn} (returned {rv})")
endtime = str(datetime.now())[:-7]
logging.info(f"completed at {endtime}.")

def setup():
"""Parse command-line arguments."""
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"--log",
default="warning",
choices=["debug", "info", "warning", "error"],
help="logging level (defaults to 'warning')",
)
parser.add_argument("-v", "--version", action="version", version=__version__)
parser.add_argument(
"-l",
"--level",
action="store_true",
default=False,
help="Correct color levels (default: no)",
)
parser.add_argument(
"-q", "--quality", type=int, default=80, help="JPEG quailty level (default: 80)"
)
parser.add_argument(
"fn", nargs="*", metavar="filename", help="DICOM files to process"
)
args = parser.parse_args(sys.argv[1:])
logging.basicConfig(
level=getattr(logging, args.log.upper(), None),
format="%(levelname)s: %(message)s",
)
logging.debug(f"command line arguments = {sys.argv}")
logging.debug(f"parsed arguments = {args}")
# Check for requisites
try:
sp.run(["convert"], stdout=sp.DEVNULL, stderr=sp.DEVNULL)
logging.info("found “convert”")
except FileNotFoundError:
logging.error("the program “convert” cannot be found")
sys.exit(1)
return args

def convert(filename, quality, level):
"""
Convert a DICOM file to a JPEG file.
Removing the blank areas from the Philips detector.
Arguments:
filename: name of the file to convert.
quality: JPEG quality to apply
level: Boolean to indicate whether level adustment should be done.
Returns:
Tuple of (input filename, output filename, convert return value)
"""
outname = filename.strip() + ".jpg"
size = "1574x2048"
args = [
"convert",
filename,
"-units",
"PixelsPerInch",
"-density",
"300",
"-depth",
"8",
"-crop",
size + "+232+0",
"-page",
size + "+0+0",
"-auto-gamma",
"-quality",
str(quality),
]
if level:
args += ["-level", "-35%,70%,0.5"]
args.append(outname)
cp = sp.run(args, stdout=sp.DEVNULL, stderr=sp.DEVNULL)
return (filename, outname, cp.returncode)

if __name__ == "__main__":
main()

或者,您可以直接管理一堆子流程(以Popen对象的形式(,如下所示。(这是较旧的代码,现在针对Python 3进行了修改(

import os
import sys
import subprocess
from multiprocessing import cpu_count
from time import sleep

def checkfor(args):
"""Make sure that a program necessary for using this script is
available.
Arguments:
args -- string or list of strings of commands. A single string may
not contain spaces.
"""
if isinstance(args, str):
if " " in args:
raise ValueError("No spaces in single command allowed.")
args = [args]
try:
with open("/dev/null", "w") as bb:
subprocess.check_call(args, stdout=bb, stderr=bb)
except Exception:
print("Required program '{}' not found! exiting.".format(args[0]))
sys.exit(1)

def startconvert(fname):
"""Use the convert(1) program from the ImageMagick suite to convert the
image and crop it."""
size = "1574x2048"
args = [
"convert",
fname,
"-units",
"PixelsPerInch",
"-density",
"300",
"-crop",
size + "+232+0",
"-page",
size + "+0+0",
fname + ".png",
]
with open("/dev/null") as bb:
p = subprocess.Popen(args, stdout=bb, stderr=bb)
print("Start processing", fname)
return (fname, p)

def manageprocs(proclist):
"""Check a list of subprocesses for processes that have ended and
remove them from the list.
"""
for it in proclist:
fn, pr = it
result = pr.poll()
if result is not None:
proclist.remove(it)
if result == 0:
print("Finished processing", fn)
else:
s = "The conversion of {} exited with error code {}."
print(s.format(fn, result))
sleep(0.5)

def main(argv):
"""Main program.
Keyword arguments:
argv -- command line arguments
"""
if len(argv) == 1:
path, binary = os.path.split(argv[0])
print("Usage: {} [file ...]".format(binary))
sys.exit(0)
del argv[0]  # delete the name of the script.
checkfor("convert")
procs = []
maxprocs = cpu_count()
for ifile in argv:
while len(procs) == maxprocs:
manageprocs(procs)
procs.append(startconvert(ifile))
while len(procs) > 0:
manageprocs(procs)

# This is the main program ##
if __name__ == "__main__":
main(sys.argv)

最新更新