>假设我有如下所示的Python流处理代码:
def F1(stream):
for x in stream:
yield f1(x)
def F2(stream):
for x in stream:
yield f2(x)
def F3(stream):
for x in stream:
yield f3(x)
def F4(stream):
for x in stream:
yield f4(x)
for x in F4(F3(F2(F1(range(1000000))))):
print(x)
这大致相当于 Unix 中的 range 1000000 | F1 | F2 | F3 | F4
(假设是 range
命令),但在 Unix 中,管道中的每个进程都并行运行。
有没有一种简单的方法来并行化 Python 代码?
你需要管道和黑魔法,Python两者都有。
from multiprocessing import Process, Pipe
def F1(stream):
for x in stream:
yield str(x)+'a'
def F2(stream):
for x in stream:
yield x+'b'
def F3(stream):
for x in stream:
yield x+'c'
def F4(stream):
for x in stream:
yield x+'d'
class PIPE_EOF:
pass
class IterableConnection(object):
def __init__(self, pipe):
self.pipe = pipe
def __iter__(self):
return self
def __next__(self):
try:
ret = self.pipe.recv()
if ret == PIPE_EOF:
raise StopIteration
return ret
except EOFError:
raise StopIteration
def next(self):
return self.__next__()
def parallel_generator_chain(*args, **kwargs):
if 'data' in kwargs:
data = kwargs['data']
else:
raise RuntimeError('Missing "data" argument.')
def decorator(func, _input, _output):
def wrapper(*args, **kwargs):
for item in func(_input):
_output.send(item)
_output.send(PIPE_EOF)
return wrapper
for func in args:
in_end, out_end = Pipe(duplex = False)
in_end = IterableConnection(in_end)
func = decorator(func, data, out_end)
p = Process(target = func)
p.start()
data = in_end
for output in data:
yield output
if 'xrange' not in globals():
xrange = range
if __name__ == '__main__':
for x in parallel_generator_chain(xrange, F1, F2, F3, F4, data=100000000):
print(x)
#for x in F4(F3(F2(F1(range(1000000))))):
# print(x)