Python 结束函数后返回另一个函数返回结果



我可能正在接近这一切,但这就是我所处的位置。 我有非常大的日志文件,我正在尝试搜索,在某些情况下高达 30gb。 我正在编写一个脚本来提取信息,并且一直在使用多进程来加快速度。 现在我正在测试同时运行两个函数以从顶部和底部搜索以获得结果,这似乎有效。 我想知道是否有可能停止一个函数一个函数的结果。 例如,如果 top 函数找到结果,它们都会停止。 这样我就可以根据需要构建它。

from file_read_backwards import FileReadBackwards
from multiprocessing import Process
import sys
z = "log.log"
#!/usr/bin/env python
rocket = 0
def top():
    target = "test"
    with open(z) as src:
        found= None
        for line in src:
            if len(line) == 0: break #happens at end of file, then stop loop
            if target in line:
                found= line
                break
    print(found)
def bottom():
    target = "text"
    with FileReadBackwards(z) as src:
        found= None
        for line in src:
            if len(line) == 0: break #happens at end of file, then stop loop
            if target in line:
                found= line
                break
    print(found)

if __name__=='__main__':
     p1 = Process(target = top)
     p1.start()
     p2 = Process(target = bottom)
     p2.start()

这是我在评论中提到的方法的概念验证:

import os
import random
import sys
from multiprocessing import Process, Value

def search(proc_no, file_name, seek_to, max_size, find, flag):
    stop_at = seek_to + max_size
    with open(file_name) as f:
        if seek_to:
            f.seek(seek_to - 1)
            prev_char = f.read(1)
            if prev_char != 'n':
                # Landed in the middle of a line. Skip back one (or
                # maybe more) lines so this line isn't excluded. Start
                # by seeking back 256 bytes, then 512 if necessary, etc.
                exponent = 8
                pos = seek_to
                while pos >= seek_to:
                    pos = f.seek(max(0, pos - (2 ** exponent)))
                    f.readline()
                    pos = f.tell()
                    exponent += 1
        while True:
            if flag.value:
                break
            line = f.readline()
            if not line:
                break  # EOF
            data = line.strip()
            if data == find:
                flag.value = proc_no
                print(data)
                break
            if f.tell() > stop_at:
                break

if __name__ == '__main__':
    # list.txt contains lines with the numbers 1 to 1000001
    file_name = 'list.txt'
    info = os.stat(file_name)
    file_size = info.st_size
    if len(sys.argv) == 1:
        # Pick a random value from list.txt
        num_lines = 1000001
        choices = list(range(1, num_lines + 1))
        choices.append('XXX')
        find = str(random.choice(choices))
    else:
        find = sys.argv[1]
    num_procs = 4
    chunk_size, remainder = divmod(file_size, num_procs)
    max_size = chunk_size + remainder
    flag = Value('i', 0)
    procs = []
    print(f'Using {num_procs} processes to look for {find} in {file_name}')
    for i in range(num_procs):
        seek_to = i * chunk_size
        proc = Process(target=search, args=(i + 1, file_name, seek_to, max_size, find, flag))
        procs.append(proc)
    for proc in procs:
        proc.start()
    for proc in procs:
        proc.join()
    if flag.value:
        print(find, 'found by proc', flag.value)
    else:
        print(find, 'not found')

在阅读了有关使用多处理和多线程读取文件的各种帖子[1]之后,由于潜在的磁盘抖动和序列化读取,似乎两者都不是一个很好的方法。所以这里有一个不同的,更简单的方法,它更快(至少对于我正在尝试的有一百万行的文件(:

import mmap
import sys
def search_file(file_name, text, encoding='utf-8'):
    text = text.encode(encoding)
    with open(file_name) as f:
        with mmap.mmap(f.fileno(), 0, flags=mmap.ACCESS_READ, prot=mmap.PROT_READ) as m:
            index = m.find(text)
            if index > -1:
                # Found a match; now find beginning of line that
                # contains match so we can grab the whole line.
                while index > 0:
                    index -= 1
                    if m[index] == 10:
                        index += 1
                        break
                else:
                    index = 0
                m.seek(index)
                line = m.readline()
                return line.decode(encoding)
if __name__ == '__main__':
    file_name, search_string = sys.argv[1:]
    line = search_file(file_name, search_string)
    sys.stdout.write(line if line is not None else f'Not found in {file_name}: {search_string}n')

我很好奇这对 30GB 日志文件的性能如何。

[1] 包括这个 one

使用 multiprocessing.Pool 和回调函数的简单示例。返回结果后终止剩余的池进程。

您可以使用此方法添加任意数量的进程以从文件中的不同偏移量进行搜索。

import math
import time
from multiprocessing import Pool
from random import random

def search(pid, wait):
    """Sleep for wait seconds, return PID
    """
    time.sleep(wait)
    return pid

def done(result):
    """Do something with result and stop other processes
    """
    print("Process: %d done." % result)
    pool.terminate()
    print("Terminate Pool")

pool = Pool(2)
pool.apply_async(search, (1, math.ceil(random() * 3)), callback=done)
pool.apply_async(search, (2, math.ceil(random() * 3)), callback=done)
# do other stuff ...
# Wait for result
pool.close()
pool.join()  # block our main thread

这基本上与 Blurp 的答案相同,但我缩短了它并使其更通用。如您所见,顶部应该是一个无限循环,但底部会立即停止顶部。

from multiprocessing import Process
valNotFound = True
def top():
    i=0
    while ValNotFound:
        i += 1

def bottom():
    ValNotFound = False

p1 = Process(target = top)
p2 = Process(target = bottom)
p1.start()
p2.start()

最新更新