我正在尝试并行运行一些Python函数,该函数在整个函数中都有打印命令。我想要的是让每个子进程运行相同的函数,以分组方式输出到主标准输出。我的意思是,我希望每个子进程的输出仅在完成任务后打印。但是,如果在此过程中发生某种错误,我仍希望输出子流程中所做的任何操作。
一个小例子:
from time import sleep
import multiprocessing as mp
def foo(x):
print('foo')
for i in range(5):
print('Process {}: in foo {}'.format(x, i))
sleep(0.5)
if __name__ == '__main__':
pool = mp.Pool()
jobs = []
for i in range(4):
job = pool.apply_async(foo, args=[i])
jobs.append(job)
for job in jobs:
job.wait()
这是并行运行的,但输出的是:
foo
Process 0: in foo 0
foo
Process 1: in foo 0
foo
Process 2: in foo 0
foo
Process 3: in foo 0
Process 1: in foo 1
Process 0: in foo 1
Process 2: in foo 1
Process 3: in foo 1
Process 1: in foo 2
Process 0: in foo 2
Process 2: in foo 2
Process 3: in foo 2
Process 1: in foo 3
Process 0: in foo 3
Process 3: in foo 3
Process 2: in foo 3
Process 1: in foo 4
Process 0: in foo 4
Process 3: in foo 4
Process 2: in foo 4
我想要的是:
foo
Process 3: in foo 0
Process 3: in foo 1
Process 3: in foo 2
Process 3: in foo 3
Process 3: in foo 4
foo
Process 1: in foo 0
Process 1: in foo 1
Process 1: in foo 2
Process 1: in foo 3
Process 1: in foo 4
foo
Process 0: in foo 0
Process 0: in foo 1
Process 0: in foo 2
Process 0: in foo 3
Process 0: in foo 4
foo
Process 2: in foo 0
Process 2: in foo 1
Process 2: in foo 2
Process 2: in foo 3
Process 2: in foo 4
任何一个进程的特定顺序都无关紧要,只要每个子进程的每个输出都组合在一起即可。有趣的是,如果我这样做,我会得到我想要的输出
python test.py > output
我知道每个子进程都没有自己的标准输出,而是使用主标准输出。我已经考虑并查找了一些解决方案,例如使它使我们使用队列,并且每个子进程都有自己的标准输出,然后在完成后,我们覆盖 flush 命令,以便我们可以将输出输出回队列。之后,我们可以阅读内容。但是,尽管这确实满足了我想要的,但如果函数中途停止,我将无法检索输出。它只会在成功完成时输出。从这里得到它 访问 python 中子进程的标准输出
我也看到了锁的使用,它有效,但它完全杀死了并行运行函数,因为它必须等待每个子进程执行函数 foo 的功能。
另外,如果可能的话,我想避免更改我的foo函数的实现,因为我有许多函数需要更改。
编辑:我已经研究了库dispy和并行python。Dispy 完全符合我的需求,它有一个单独的 stdout/stderr,我可以在最后打印出来,但 dispy 的问题在于我必须在单独的终端中手动运行服务器。我希望能够一次性运行我的 python 程序,而无需先打开另一个脚本。另一方面,并行Python也可以做我想要的,但它似乎缺乏对它的控制,以及一些烦人的麻烦。特别是当你打印出输出时,它也打印出函数的返回类型,我只想要我使用 print 打印出来的输出。此外,在运行函数时,您必须给它一个它使用的模块列表,这有点烦人,因为我不想仅仅为了运行一个简单的函数而拥有大量的导入列表。
正如你所注意到的,在这种情况下使用锁会杀死多处理,因为你基本上会让所有进程等待当前拥有STDOUT"权限"的进程的互斥释放。但是,并行运行并与函数/子流程同步打印在逻辑上是排他性的。
相反,您可以做的是让您的主进程充当子流程的"打印机",一旦您的子流程完成/错误,然后它才会将要打印的内容发送回您的主流程。您似乎完全满足于打印不是"实时的"(无论如何,如前所述,它也不能),因此这种方法应该为您提供恰到好处的服务。所以:
import multiprocessing as mp
import random # just to add some randomness
from time import sleep
def foo(x):
output = ["[Process {}]: foo:".format(x)]
for i in range(5):
output.append('[Process {}] in foo {}'.format(x, i))
sleep(0.2 + 1 * random.random())
return "n".join(output)
if __name__ == '__main__':
pool = mp.Pool(4)
for res in pool.imap_unordered(foo, range(4)):
print("[MAIN]: Process finished, response:")
print(res) # this will print as soon as one of the processes finishes/errors
pool.close()
这会给你(当然是 YMMV):
[MAIN]: Process finished, response:
[Process 2]: foo:
[Process 2] in foo 0
[Process 2] in foo 1
[Process 2] in foo 2
[Process 2] in foo 3
[Process 2] in foo 4
[MAIN]: Process finished, response:
[Process 0]: foo:
[Process 0] in foo 0
[Process 0] in foo 1
[Process 0] in foo 2
[Process 0] in foo 3
[Process 0] in foo 4
[MAIN]: Process finished, response:
[Process 1]: foo:
[Process 1] in foo 0
[Process 1] in foo 1
[Process 1] in foo 2
[Process 1] in foo 3
[Process 1] in foo 4
[MAIN]: Process finished, response:
[Process 3]: foo:
[Process 3] in foo 0
[Process 3] in foo 1
[Process 3] in foo 2
[Process 3] in foo 3
[Process 3] in foo 4
您可以以相同的方式观察其他任何内容,包括错误。
更新- 如果您绝对必须使用您无法控制其输出的函数,您可以包装您的子流程并捕获它们的 STDOUT/STDRR,然后一旦它们完成(或引发异常),您可以将所有内容返回给进程"管理器"以打印到实际的 STDOUT。通过这样的设置,我们可以有这样的foo()
:
def foo(x):
print("[Process {}]: foo:".format(x))
for i in range(5):
print('[Process {}] in foo {}'.format(x, i))
sleep(0.2 + 1 * random.random())
if random.random() < 0.0625: # let's add a 1/4 chance to err:
raise Exception("[Process {}] A random exception is random!".format(x))
return random.random() * 100 # just a random response, you can omit it
请注意,它幸福地没有意识到某些东西试图弄乱其操作模式。然后,我们将创建一个外部通用包装器(因此您不必根据函数的依赖性更改它)来实际弄乱其默认行为(不仅仅是这个函数,而是它在运行时可能调用的其他所有内容):
def std_wrapper(args):
try:
from StringIO import StringIO # ... for Python 2.x compatibility
except ImportError:
from io import StringIO
import sys
sys.stdout, sys.stderr = StringIO(), StringIO() # replace stdout/err with our buffers
# args is a list packed as: [0] process function name; [1] args; [2] kwargs; lets unpack:
process_name = args[0]
process_args = args[1] if len(args) > 1 else []
process_kwargs = args[2] if len(args) > 2 else {}
# get our method from its name, assuming global namespace of the current module/script
process = globals()[process_name]
response = None # in case a call fails
try:
response = process(*process_args, **process_kwargs) # call our process function
except Exception as e: # too broad but good enough as an example
print(e)
# rewind our buffers:
sys.stdout.seek(0)
sys.stderr.seek(0)
# return everything packed as STDOUT, STDERR, PROCESS_RESPONSE | NONE
return sys.stdout.read(), sys.stderr.read(), response
现在我们所需要的只是调用这个包装器而不是所需的foo()
,并为其提供有关代表我们调用什么的信息:
if __name__ == '__main__':
pool = mp.Pool(4)
# since we're wrapping the process we're calling, we need to send to the wrapper packed
# data with instructions on what to call on our behalf.
# info on args packing available in the std_wrapper function above.
for out, err, res in pool.imap_unordered(std_wrapper, [("foo", [i]) for i in range(4)]):
print("[MAIN]: Process finished, response: {}, STDOUT:".format(res))
print(out.rstrip()) # remove the trailing space for niceness, print err if you want
pool.close()
所以现在如果你运行它,你会得到这样的东西:
[MAIN]: Process finished, response: None, STDOUT:
[Process 2]: foo:
[Process 2] in foo 0
[Process 2] in foo 1
[Process 2] A random exception is random!
[MAIN]: Process finished, response: 87.9658471743586, STDOUT:
[Process 1]: foo:
[Process 1] in foo 0
[Process 1] in foo 1
[Process 1] in foo 2
[Process 1] in foo 3
[Process 1] in foo 4
[MAIN]: Process finished, response: 38.929554421661194, STDOUT:
[Process 3]: foo:
[Process 3] in foo 0
[Process 3] in foo 1
[Process 3] in foo 2
[Process 3] in foo 3
[Process 3] in foo 4
[MAIN]: Process finished, response: None, STDOUT:
[Process 0]: foo:
[Process 0] in foo 0
[Process 0] in foo 1
[Process 0] in foo 2
[Process 0] in foo 3
[Process 0] in foo 4
[Process 0] A random exception is random!
尽管foo()
只是打印或出错。当然,您可以使用这样的包装器来调用任何函数并将任意数量的 args/kwarg 传递给它。
更新 #2- 但是等等!如果我们能像这样包装我们的函数进程,并捕获它们的 STDOUT/STDERR,我们肯定可以把它变成一个装饰器,并在我们的代码中用简单的装饰来使用它。所以,对于我的最终建议:
import functools
import multiprocessing
import random # just to add some randomness
import time
def std_wrapper(func):
@functools.wraps(func) # we need this to unravel the target function name
def caller(*args, **kwargs): # and now for the wrapper, nothing new here
try:
from StringIO import StringIO # ... for Python 2.x compatibility
except ImportError:
from io import StringIO
import sys
sys.stdout, sys.stderr = StringIO(), StringIO() # use our buffers instead
response = None # in case a call fails
try:
response = func(*args, **kwargs) # call our wrapped process function
except Exception as e: # too broad but good enough as an example
print(e) # NOTE: the exception is also printed to the captured STDOUT
# rewind our buffers:
sys.stdout.seek(0)
sys.stderr.seek(0)
# return everything packed as STDOUT, STDERR, PROCESS_RESPONSE | NONE
return sys.stdout.read(), sys.stderr.read(), response
return caller
@std_wrapper # decorate any function, it won't know you're siphoning its STDOUT/STDERR
def foo(x):
print("[Process {}]: foo:".format(x))
for i in range(5):
print('[Process {}] in foo {}'.format(x, i))
time.sleep(0.2 + 1 * random.random())
if random.random() < 0.0625: # let's add a 1/4 chance to err:
raise Exception("[Process {}] A random exception is random!".format(x))
return random.random() * 100 # just a random response, you can omit it
现在我们可以像以前一样调用包装的函数,而无需处理参数打包或类似的东西,所以我们回到了:
if __name__ == '__main__':
pool = multiprocessing.Pool(4)
for out, err, res in pool.imap_unordered(foo, range(4)):
print("[MAIN]: Process finished, response: {}, STDOUT:".format(res))
print(out.rstrip()) # remove the trailing space for niceness, print err if you want
pool.close()
输出与前面的示例相同,但在一个更好且易于管理的包中。