并行化一系列生成器



>假设我有如下所示的Python流处理代码:

def F1(stream):
    for x in stream:
        yield f1(x)
def F2(stream):
    for x in stream:
        yield f2(x)
def F3(stream):
    for x in stream:
        yield f3(x)
def F4(stream):
    for x in stream:
        yield f4(x)

for x in F4(F3(F2(F1(range(1000000))))):
    print(x)

这大致相当于 Unix 中的 range 1000000 | F1 | F2 | F3 | F4(假设是 range 命令),但在 Unix 中,管道中的每个进程都并行运行。

有没有一种简单的方法来并行化 Python 代码?

你需要管道和黑魔法,Python两者都有。

from multiprocessing import Process, Pipe

def F1(stream):
    for x in stream:
        yield str(x)+'a'
def F2(stream):
    for x in stream:
        yield x+'b'
def F3(stream):
    for x in stream:
        yield x+'c'
def F4(stream):
    for x in stream:
        yield x+'d'

class PIPE_EOF:
    pass
class IterableConnection(object):
    def __init__(self, pipe):
        self.pipe = pipe
    def __iter__(self):
        return self
    def __next__(self):
        try:
            ret = self.pipe.recv()
            if ret == PIPE_EOF:
                raise StopIteration
            return ret
        except EOFError:
            raise StopIteration
    def next(self):
        return self.__next__()

def parallel_generator_chain(*args, **kwargs):
    if 'data' in kwargs:
        data = kwargs['data']
    else:
        raise RuntimeError('Missing "data" argument.')
    def decorator(func, _input, _output):
        def wrapper(*args, **kwargs):
            for item in func(_input):
                _output.send(item)
            _output.send(PIPE_EOF)
        return wrapper
    for func in args:
        in_end, out_end = Pipe(duplex = False)
        in_end = IterableConnection(in_end)
        func = decorator(func, data, out_end)
        p = Process(target = func)
        p.start()
        data = in_end
    for output in data:
        yield output

if 'xrange' not in globals():
    xrange = range

if __name__ == '__main__':
    for x in parallel_generator_chain(xrange, F1, F2, F3, F4, data=100000000):
        print(x)
#for x in F4(F3(F2(F1(range(1000000))))):
#    print(x)

相关内容

  • 没有找到相关文章

最新更新