带生成器的 Python 多处理



我正在尝试处理一个文件(每一行都是一个json文档(。文件的大小可以达到 100 mbs 到 GB。所以我写了一个生成器代码来从文件中逐行获取每个文档。

def jl_file_iterator(file):
    with codecs.open(file, 'r', 'utf-8') as f:
        for line in f:
            document = json.loads(line)
            yield document

我的系统有 4 个内核,所以我想并行处理文件的 4 行。目前我有这段代码,一次需要 4 行并调用代码进行并行处理

threads = 4
files, i = [], 1
for jl in jl_file_iterator(input_path):
    files.append(jl)
    if i % (threads) == 0:
        # pool.map(processFile, files)
        parallelProcess(files, o)
        files = []
    i += 1
if files:
    parallelProcess(files, o)
    files = []

这是我的代码,实际处理发生

def parallelProcess(files, outfile):
    processes = []
    for i in range(len(files)):
        p = Process(target=processFile, args=(files[i],))
        processes.append(p)
        p.start()
    for i in range(len(files)):
        processes[i].join()
def processFile(doc):
    extractors = {}
    ... do some processing on doc
    o.write(json.dumps(doc) + 'n')

如您所见,我等待所有 4 行完成处理,然后再发送接下来的 4 个文件进行处理。但是我想做的是,一旦一个进程完成处理文件,我就想开始下一行分配给已实现的处理器。我该怎么做?

PS:问题是因为它是一个生成器,我无法加载所有文件并使用类似map的东西来运行进程。

感谢您的帮助

正如@pvg在评论中所说,(有界(队列是以不同的速度在生产者和消费者之间进行调解的自然方式,确保他们都尽可能保持忙碌,但又不会让生产者领先一步。

下面是一个独立的可执行示例。 队列的最大大小限制为等于工作进程数的最大大小。 如果消费者的运行速度比生产者快得多,那么让队列变得比这大可能是很有意义的。

在您的特定情况下,将线路传递给消费者并让他们并行执行document = json.loads(line)部分可能是有意义的。

import multiprocessing as mp
NCORE = 4
def process(q, iolock):
    from time import sleep
    while True:
        stuff = q.get()
        if stuff is None:
            break
        with iolock:
            print("processing", stuff)
        sleep(stuff)
if __name__ == '__main__':
    q = mp.Queue(maxsize=NCORE)
    iolock = mp.Lock()
    pool = mp.Pool(NCORE, initializer=process, initargs=(q, iolock))
    for stuff in range(20):
        q.put(stuff)  # blocks until q below its max size
        with iolock:
            print("queued", stuff)
    for _ in range(NCORE):  # tell workers we're done
        q.put(None)
    pool.close()
    pool.join()

所以我最终成功地运行了这个。通过从我的文件创建行块并并行运行行。将其发布在这里,以便将来对某人有用。

def run_parallel(self, processes=4):
    processes = int(processes)
    pool = mp.Pool(processes)
    try:
        pool = mp.Pool(processes)
        jobs = []
        # run for chunks of files
        for chunkStart,chunkSize in self.chunkify(input_path):
            jobs.append(pool.apply_async(self.process_wrapper,(chunkStart,chunkSize)))
        for job in jobs:
            job.get()
        pool.close()
    except Exception as e:
        print e
def process_wrapper(self, chunkStart, chunkSize):
    with open(self.input_file) as f:
        f.seek(chunkStart)
        lines = f.read(chunkSize).splitlines()
        for line in lines:
            document = json.loads(line)
            self.process_file(document)
# Splitting data into chunks for parallel processing
def chunkify(self, filename, size=1024*1024):
    fileEnd = os.path.getsize(filename)
    with open(filename,'r') as f:
        chunkEnd = f.tell()
        while True:
            chunkStart = chunkEnd
            f.seek(size,1)
            f.readline()
            chunkEnd = f.tell()
            yield chunkStart, chunkEnd - chunkStart
            if chunkEnd > fileEnd:
                break

蒂姆·彼得斯的回答很棒。
但我的具体情况略有不同,我不得不修改他的答案以适应我的需要。参考这里。
这回答了评论中@CpILL的问题。


就我而言,我使用了生成器链(来创建管道(。
在这个发电机链中,其中一个正在进行繁重的计算,减慢了整个管道的速度。

像这样:

def fast_generator1():
    for line in file:
        yield line
def slow_generator(lines):
    for line in lines:
        yield heavy_processing(line)
def fast_generator2():
    for line in lines:
        yield fast_func(line)
if __name__ == "__main__":
    lines = fast_generator1()
    lines = slow_generator(lines)
    lines = fast_generator2(lines)
    for line in lines:
        print(line)

为了使其更快,我们必须使用多个进程执行慢速生成器。
修改后的代码如下所示:

import multiprocessing as mp
NCORE = 4
def fast_generator1():
    for line in file:
        yield line
def slow_generator(lines):
    def gen_to_queue(input_q, lines):
        # This function simply consume our generator and write it to the input queue
        for line in lines:
            input_q.put(line)
        for _ in range(NCORE):    # Once generator is consumed, send end-signal
            input_q.put(None)
    def process(input_q, output_q):
        while True:
            line = input_q.get()
            if line is None:
                output_q.put(None)
                break
            output_q.put(heavy_processing(line))

    input_q = mp.Queue(maxsize=NCORE * 2)
    output_q = mp.Queue(maxsize=NCORE * 2)
    # Here we need 3 groups of worker :
    # * One that will consume the input generator and put it into a queue. It will be `gen_pool`. It's ok to have only 1 process doing this, since this is a very light task
    # * One that do the main processing. It will be `pool`.
    # * One that read the results and yield it back, to keep it as a generator. The main thread will do it.
    gen_pool = mp.Pool(1, initializer=gen_to_queue, initargs=(input_q, lines))
    pool = mp.Pool(NCORE, initializer=process, initargs=(input_q, output_q))
    finished_workers = 0
    while True:
        line = output_q.get()
        if line is None:
            finished_workers += 1
            if finished_workers == NCORE:
                break
        else:
            yield line
def fast_generator2():
    for line in lines:
        yield fast_func(line)
if __name__ == "__main__":
    lines = fast_generator1()
    lines = slow_generator(lines)
    lines = fast_generator2(lines)
    for line in lines:
        print(line)

通过这个实现,我们有一个多进程生成器:它的使用与其他生成器完全相同(就像这个答案的第一个示例一样(,但所有繁重的计算都是使用多处理完成的,加速了它!

您也可以

像这样使用更高级别的concurrent.futures模块:

import concurrent.futures as cf
import time
from concurrent.futures import ProcessPoolExecutor
def read(infile):
    with open(infile, "r") as f:
        for line in f:
            yield line.strip()

def process(line):
    # Simulate doing some heavy processing on `line`
    time.sleep(3)
    return line.upper()

def run_parallel(num_workers, lines):
    with ProcessPoolExecutor(max_workers=num_workers) as p:
        futures = {p.submit(process, line) for line in lines}
        for future in cf.as_completed(futures):
            yield future.result()

def write(outfile, lines):
    with open(outfile, "w") as f:
        for line in lines:
            f.write(line + "n")

NUM_WORKERS = 4
if __name__ == "__main__":
    start = time.time()
    lines = reader("infile.txt")
    lines = run_parallel(NUM_WORKERS, lines)
    write("outfile.txt", lines)
    print(time.time() - start)

输入文件:

a
b
c
d
e
f
g
h
i
j

输出文件:

A
F
B
G
E
D
C
H
I
J

标准输出:

9.016341924667358

晚了。有类似的问题。基本上是生产者和消费者。就像很少有人指出队列最适合这个问题一样。

您可以创建一个执行程序池(线程或进程(,并将其与信号量结合使用,以确保同时选取 n 个任务。如果生成器提交了任何其他任务,则会阻止该任务,直到信号量计数器减少。

找到一个现成的解决方案。看看这个要点

相关内容

  • 没有找到相关文章

最新更新