您好!
我有一个python脚本,它创建一个文件列表并在多进程中处理它。Pool.map和线程函数。线程函数使用外部可执行文件,并通过subprocess.check_call调用它。这个外部可执行程序将一些信息打印到stdout。
所以我在阅读这个输出时遇到了问题-有时它很糟糕,我无法从中获得任何有用的信息。我读过关于python中的打印和多线程的文章,但我认为这并不完全是我的问题,因为我在脚本中没有显式调用print函数。
我该如何解决这个问题?非常感谢。
此外,我还注意到,如果我将脚本的输出重定向到一个文件,那么输出就一点也不乱。
[更新]:
如果我运行脚本:python mp.py>mp.log ,这会很好地工作
import time, argparse, threading, sys
from os import getenv
from multiprocessing import Pool
def f(x):
cube = x*x*x
print '|Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut %d|'%(cube)
return cube
if __name__ == '__main__':
#file = open('log.txt', 'w+')
parser = argparse.ArgumentParser(description='cube', usage='%(prog)s [options] -n')
parser.add_argument('-n', action='store', help='number', dest='n', default='10000', metavar = '')
args = parser.parse_args()
pool = Pool()
start = time.time()
result = pool.map(f, range(int(args.n)))
end = time.time()
print (end - start)
#file.close()
为了避免来自多个并发子流程的混合输出,您可以将每个子流程的输出重定向到不同的文件:
from multiprocessing.dummy import Pool # use threads
from subprocess import call
def run(i):
with open('log%d.txt' % i, 'wb') as file:
return call(["cmd", str(i)], stdout=file)
return_codes = Pool(4).map(run, range(10)) # run 10 subprocesses, 4 at a time
或者收集输出并从代码中的单个线程打印出来:
from functools import partial
from multiprocessing.dummy import Pool, Queue, Process # use threads
from subprocess import Popen, PIPE
def run(i, output):
p = Popen(["cmd", str(i)], stdout=PIPE, bufsize=1)
for line in iter(p.stdout.readline, b''):
output((p.pid, line)) # collect the output
p.stdout.close()
return p.wait()
def print_output(q):
for pid, line in iter(q.get, None):
print pid, line.rstrip()
q = Queue()
Process(target=print_output, args=[q]).start() # start printing thread
return_codes = Pool(4).map(partial(run, output=q.put_nowait),
range(10)) # run 10 subprocesses, 4 at a time
q.put(None) # exit printing thread
或者你可以使用一个锁:
from __future__ import print_function
from multiprocessing.dummy import Pool, Lock # use threads
from subprocess import Popen, PIPE
def run(i, lock=Lock()):
p = Popen(["cmd", str(i)], stdout=PIPE, bufsize=1)
for line in iter(p.stdout.readline, b''):
with lock:
print(p.pid, line.rstrip())
p.stdout.close()
return p.wait()
return_codes = Pool(4).map(run, range(10)) # run 10 subprocesses, 4 at a time
注意:print()
函数用于解决以下问题:为什么使用线程的脚本偶尔会打印额外的行?
为了避免混合来自不同子流程的行,您可以根据实际输出一次收集大于一行的单元。
另一个相当通用的解决方案,也使用唯一文件:
import time, argparse, threading, sys
from os import getenv, getcwd, getpid
from os.path import join
from multiprocessing import Pool, cpu_count
logger = None # Will be set by init() to give a unique logger for each process in the pool
def init(*initargs):
global logger
print(initargs)
lpath = getcwd() if initargs is None or len(initargs) == 0 else initargs[0]
name = 'log{!s}'.format(str(getpid()))
logger = open(join(lpath, name), mode='wt') # Get logger with unique name
def f(x):
global logger
cube = x*x*x
logger.write('|Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut {}|n'.format(cube))
logger.flush()
return cube
if __name__ == '__main__':
#file = open('log.txt', 'w+')
parser = argparse.ArgumentParser(description='cube', usage='%(prog)s [options] -n')
parser.add_argument('-n', action='store', help='number', dest='n', default='10000', metavar = '')
args = parser.parse_args()
pool = Pool(cpu_count(), init)
start = time.time()
result = pool.map(f, range(int(args.n)))
end = time.time()
print (end - start)
#file.close()