打印多线程子进程



您好!

我有一个python脚本,它创建一个文件列表并在多进程中处理它。Pool.map和线程函数。线程函数使用外部可执行文件,并通过subprocess.check_call调用它。这个外部可执行程序将一些信息打印到stdout。

所以我在阅读这个输出时遇到了问题-有时它很糟糕,我无法从中获得任何有用的信息。我读过关于python中的打印和多线程的文章,但我认为这并不完全是我的问题,因为我在脚本中没有显式调用print函数。

我该如何解决这个问题?非常感谢。

此外,我还注意到,如果我将脚本的输出重定向到一个文件,那么输出就一点也不乱。

[更新]:

如果我运行脚本:python mp.py>mp.log ,这会很好地工作

import time, argparse, threading, sys
from os import getenv
from multiprocessing import Pool
def f(x):
    cube = x*x*x
    print '|Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut %d|'%(cube)
    return cube
if __name__ == '__main__':
    #file = open('log.txt', 'w+')
    parser = argparse.ArgumentParser(description='cube', usage='%(prog)s [options] -n')
    parser.add_argument('-n', action='store', help='number', dest='n', default='10000', metavar = '')
    args = parser.parse_args()
    pool = Pool()
    start = time.time()
    result = pool.map(f, range(int(args.n)))
    end = time.time()
    print (end - start)
    #file.close()

为了避免来自多个并发子流程的混合输出,您可以将每个子流程的输出重定向到不同的文件:

from multiprocessing.dummy import Pool # use threads
from subprocess import call
def run(i):
    with open('log%d.txt' % i, 'wb') as file:
        return call(["cmd", str(i)], stdout=file)
return_codes = Pool(4).map(run, range(10)) # run 10 subprocesses, 4 at a time

或者收集输出并从代码中的单个线程打印出来:

from functools import partial
from multiprocessing.dummy import Pool, Queue, Process # use threads
from subprocess import Popen, PIPE
def run(i, output):
    p = Popen(["cmd", str(i)], stdout=PIPE, bufsize=1)
    for line in iter(p.stdout.readline, b''):
        output((p.pid, line)) # collect the output 
    p.stdout.close()
    return p.wait()
def print_output(q):
    for pid, line in iter(q.get, None):
        print pid, line.rstrip()
q = Queue()
Process(target=print_output, args=[q]).start() # start printing thread
return_codes = Pool(4).map(partial(run, output=q.put_nowait),
                           range(10)) # run 10 subprocesses, 4 at a time
q.put(None) # exit printing thread

或者你可以使用一个锁:

from __future__ import print_function
from multiprocessing.dummy import Pool, Lock # use threads
from subprocess import Popen, PIPE
def run(i, lock=Lock()):
    p = Popen(["cmd", str(i)], stdout=PIPE, bufsize=1)
    for line in iter(p.stdout.readline, b''):
        with lock:
            print(p.pid, line.rstrip())
    p.stdout.close()
    return p.wait()
return_codes = Pool(4).map(run, range(10)) # run 10 subprocesses, 4 at a time

注意:print()函数用于解决以下问题:为什么使用线程的脚本偶尔会打印额外的行?

为了避免混合来自不同子流程的行,您可以根据实际输出一次收集大于一行的单元。

另一个相当通用的解决方案,也使用唯一文件:

import time, argparse, threading, sys
from os import getenv, getcwd, getpid
from os.path import join
from multiprocessing import Pool, cpu_count
logger = None  # Will be set by init() to give a unique logger for each process in the pool
def init(*initargs):
    global logger
    print(initargs)
    lpath = getcwd() if initargs is None or len(initargs) == 0 else initargs[0]
    name = 'log{!s}'.format(str(getpid()))
    logger = open(join(lpath, name), mode='wt')  # Get logger with unique name

def f(x):
    global logger
    cube = x*x*x
    logger.write('|Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut {}|n'.format(cube))
    logger.flush()
    return cube
if __name__ == '__main__':
    #file = open('log.txt', 'w+')
    parser = argparse.ArgumentParser(description='cube', usage='%(prog)s [options] -n')
    parser.add_argument('-n', action='store', help='number', dest='n', default='10000', metavar = '')
    args = parser.parse_args()
    pool = Pool(cpu_count(), init)
    start = time.time()
    result = pool.map(f, range(int(args.n)))
    end = time.time()
    print (end - start)
    #file.close()

最新更新