搜索文件最后 X 行的最有效方法



我有一个文件,我不知道它会有多大(它可能很大,但大小会有很大差异(。 我想搜索最后 10 行左右,看看它们中是否有任何一行与字符串匹配。 我需要尽可能快速有效地做到这一点,并且想知道是否有比以下更好的方法:

s = "foo"
last_bit = fileObj.readlines()[-10:]
for line in last_bit:
    if line == s:
        print "FOUND"
# Tail
from __future__ import with_statement
find_str = "FIREFOX"                    # String to find
fname = "g:/autoIt/ActiveWin.log_2"     # File to check
with open(fname, "r") as f:
    f.seek (0, 2)           # Seek @ EOF
    fsize = f.tell()        # Get Size
    f.seek (max (fsize-1024, 0), 0) # Set pos @ last n chars
    lines = f.readlines()       # Read to end
lines = lines[-10:]    # Get last 10 lines
# This returns True if any line is exactly find_str + "n"
print find_str + "n" in lines
# If you're searching for a substring
for line in lines:
    if find_str in line:
        print True
        break

这是一个类似于 MizardX 的答案,但在最坏的情况下,在添加块时,它没有明显的二次时间,即在最坏的情况下反复重新扫描工作字符串以查找换行符。

与活动状态解决方案(似乎也是二次的(相比,这不会在给定空文件的情况下爆炸,并且每个块读取一个查找而不是两个。

与生成"尾巴"相比,这是自给自足的。(但"尾巴"是最好的,如果你有它。

与从末端抓取几 kB 并希望它足够相比,这适用于任何行长度。

import os
def reversed_lines(file):
    "Generate the lines of file in reverse order."
    part = ''
    for block in reversed_blocks(file):
        for c in reversed(block):
            if c == 'n' and part:
                yield part[::-1]
                part = ''
            part += c
    if part: yield part[::-1]
def reversed_blocks(file, blocksize=4096):
    "Generate blocks of file's contents in reverse order."
    file.seek(0, os.SEEK_END)
    here = file.tell()
    while 0 < here:
        delta = min(blocksize, here)
        here -= delta
        file.seek(here, os.SEEK_SET)
        yield file.read(delta)

要根据要求使用它,请执行以下操作:

from itertools import islice
def check_last_10_lines(file, key):
    for line in islice(reversed_lines(file), 10):
        if line.rstrip('n') == key:
            print 'FOUND'
            break

编辑:在head((中将map((更改为itertools.imap((。编辑 2:简化reversed_blocks((。编辑 3:避免重新扫描尾部换行符。编辑4:重写了reversed_lines((,因为str.splitlines((忽略了最后的'',正如BrianB注意到的那样(谢谢(。

请注意,在非常旧的 Python 版本中,此处循环中的字符串连接将需要二次时间。至少在过去几年中,CPython自动避免了这个问题。

如果你在POSIX系统上运行Python,你可以使用'tail -10'来检索最后几行。这可能比编写自己的 Python 代码来获取最后 10 行更快。不要直接打开文件,而是从命令"tail -10 文件名"打开管道。如果您确定日志输出(例如,您知道从来没有任何数百或数千个字符长的很长的行(,那么使用列出的"读取最后 2KB"方法之一就可以了。

我认为读取文件的最后 2 KB 左右应该确保你得到 10 行,并且不应该太占用资源。

file_handle = open("somefile")
file_size = file_handle.tell()
file_handle.seek(max(file_size - 2*1024, 0))
# this will get rid of trailing newlines, unlike readlines()
last_10 = file_handle.read().splitlines()[-10:]
assert len(last_10) == 10, "Only read %d lines" % len(last_10)

这是一个使用mmap的版本,看起来非常有效。最大的优点是mmap会自动为您处理文件到内存分页要求。

import os
from mmap import mmap
def lastn(filename, n):
    # open the file and mmap it
    f = open(filename, 'r+')
    m = mmap(f.fileno(), os.path.getsize(f.name))
    nlcount = 0
    i = m.size() - 1 
    if m[i] == 'n': n += 1
    while nlcount < n and i > 0:
        if m[i] == 'n': nlcount += 1
        i -= 1
    if i > 0: i += 2
    return m[i:].splitlines()
target = "target string"
print [l for l in lastn('somefile', 10) if l == target]
我想

我记得当我不得不做类似的事情时,我改编了 Manu Garg 的这篇博文中的代码。

如果你使用的是 unix 机器,os.popen("tail -10 " + filepath).readlines()可能是最快的方法。 否则,这取决于您希望它有多强大。 到目前为止提出的方法都将以一种或另一种方式失败。 为了在最常见的情况下实现健壮性和速度,您可能需要对数搜索之类的东西:使用 file.seek 转到文件末尾减去 1000 个字符,读入它,检查它包含多少行,然后到 EOF 减去 3000 个字符,读取 2000 个字符,计算行数,然后 EOF 减去 7000,读取 4000 个字符, 计算行数等,直到您有尽可能多的行。 但是,如果您确定它总是会在具有合理行长度的文件上运行,则可能不需要它。

您可能还会在 unix tail 命令的源代码中找到一些灵感。

我遇到了这个问题,解析了 LARGE 系统日志文件的最后一小时,并从 activestate 的配方站点使用了这个函数......(http://code.activestate.com/recipes/439045/(

!/usr/bin/env python
# -*-mode: python; coding: iso-8859-1 -*-
#
# Copyright (c) Peter Astrand <astrand@cendio.se>
import os
import string
class BackwardsReader:
    """Read a file line by line, backwards"""
    BLKSIZE = 4096
    def readline(self):
        while 1:
            newline_pos = string.rfind(self.buf, "n")
            pos = self.file.tell()
            if newline_pos != -1:
                # Found a newline
                line = self.buf[newline_pos+1:]
                self.buf = self.buf[:newline_pos]
                if pos != 0 or newline_pos != 0 or self.trailing_newline:
                    line += "n"
                return line
            else:
                if pos == 0:
                    # Start-of-file
                    return ""
                else:
                    # Need to fill buffer
                    toread = min(self.BLKSIZE, pos)
                    self.file.seek(-toread, 1)
                    self.buf = self.file.read(toread) + self.buf
                    self.file.seek(-toread, 1)
                    if pos - toread == 0:
                        self.buf = "n" + self.buf
    def __init__(self, file):
        self.file = file
        self.buf = ""
        self.file.seek(-1, 2)
        self.trailing_newline = 0
        lastchar = self.file.read(1)
        if lastchar == "n":
            self.trailing_newline = 1
            self.file.seek(-1, 2)
# Example usage
br = BackwardsReader(open('bar'))
while 1:
    line = br.readline()
    if not line:
        break
    print repr(line)

它运行得非常好,并且比 fileObj.readlines(([-10:] 等任何东西更有效,它使 python 将整个文件读入内存,然后从中砍掉最后十行。

我接受了mhawke的建议使用mmap并编写了一个使用rfind的版本:

from mmap import mmap
import sys
def reverse_file(f):
    mm = mmap(f.fileno(), 0)
    nl = mm.size() - 1
    prev_nl = mm.size()
    while nl > -1:
        nl = mm.rfind('n', 0, nl)
        yield mm[nl + 1:prev_nl]
        prev_nl = nl + 1
def main():
    # Example usage
    with open('test.txt', 'r+') as infile:
        for line in reverse_file(infile):
            sys.stdout.write(line)

感谢 18 Darius Bacon 的解决方案,但实现速度提高了 30%,并包装到 io 中。基本IO类。

class ReverseFile(io.IOBase):
    def __init__ (self, filename, headers=1):
        self.fp = open(filename)
        self.headers = headers
        self.reverse = self.reversed_lines()
        self.end_position = -1
        self.current_position = -1
    def readline(self, size=-1):
        if self.headers > 0:
            self.headers -= 1
            raw = self.fp.readline(size)
            self.end_position = self.fp.tell()
            return raw
        raw = next(self.reverse)
        if self.current_position > self.end_position:
            return raw
        raise StopIteration
    def reversed_lines(self):
        """Generate the lines of file in reverse order.
        """
        part = ''
        for block in self.reversed_blocks():
            block = block + part
            block = block.split('n')
            block.reverse()
            part = block.pop()
            if block[0] == '':
                block.pop(0)
            for line in block:
                yield line + 'n'
        if part:
            yield part
    def reversed_blocks(self, blocksize=0xFFFF):
        "Generate blocks of file's contents in reverse order."
        file = self.fp
        file.seek(0, os.SEEK_END)
        here = file.tell()
        while 0 < here:
            delta = min(blocksize, here)
            here -= delta
            file.seek(here, os.SEEK_SET)
            self.current_position = file.tell()
            yield file.read(delta)

一个例子

rev = ReverseFile(filename)
for i, line in enumerate(rev):
        print("{0}: {1}".format(i, line.strip()))

您可以将 1,000 字节左右的块从文件末尾读取到缓冲区中,直到有 10 行。

您还可以在反转文件时计算行数,而不是猜测字节偏移量。

lines = 0
chunk_size = 1024
f = file('filename')
f.seek(0, 2)
f.seek(f.tell() - chunk_size)
while True:
    s = f.read(chunk_size)
    lines += s.count('n')
    if lines > NUM_OF_LINES:
        break
    f.seek(f.tell() - chunk_size*2)

现在,文件处于运行readlines()的良好位置。您还可以缓存第一次读取的字符串,以消除两次读取文件的同一部分。

读取文件的最后几个 K,并将其拆分为行以仅返回最后 10 个。

该块的开头不太可能落在行边界上,但无论如何您都会丢弃第一行。

就我个人而言,我很想突破到外壳并调用 tail -n10 来加载文件。但是我并不是真正的Python程序员;)

首先,一个返回列表的函数:

def lastNLines(file, N=10, chunksize=1024):
    lines = None
    file.seek(0,2) # go to eof
    size = file.tell()
    for pos in xrange(chunksize,size-1,chunksize):
        # read a chunk
        file.seek(pos,2)
        chunk = file.read(chunksize)
        if lines is None:
            # first time
            lines = chunk.splitlines()
        else:
            # other times, update the 'first' line with
            # the new data, and re-split
            lines[0:1] = (chunk + lines[0]).splitlines()
        if len(lines) > N:
            return lines[-N:]
    file.seek(0)
    chunk = file.read(size-pos)
    lines[0:1] = (chunk + lines[0]).splitlines()
    return lines[-N:]

其次,一个以相反顺序迭代行的函数:

def iter_lines_reversed(file, chunksize=1024):
    file.seek(0,2)
    size = file.tell()
    last_line = ""
    for pos in xrange(chunksize,size-1,chunksize):
        # read a chunk
        file.seek(pos,2)
        chunk = file.read(chunksize) + last_line
        # split into lines
        lines = chunk.splitlines()
        last_line = lines[0]
        # iterate in reverse order
        for index,line in enumerate(reversed(lines)):
            if index > 0:
                yield line
    # handle the remaining data at the beginning of the file
    file.seek(0)
    chunk = file.read(size-pos) + last_line
    lines = chunk.splitlines()
    for line in reversed(lines):
        yield line

对于您的示例:

s = "foo"
for index, line in enumerate(iter_lines_reversed(fileObj)):
    if line == s:
        print "FOUND"
        break
    elif index+1 >= 10:
        break

编辑:现在自动
获取文件大小编辑2:现在只迭代 10 行。

此解决方案将只读取文件一次,但使用 2 个文件对象指针来获取文件的最后 N 行,而无需重新读取它:

def getLastLines (path, n):
    # return the las N lines from the file indicated in path
    fp = open(path)
    for i in range(n):
        line = fp.readline()
        if line == '':
            return []
    back = open(path)
    for each in fp:
        back.readline()
    result = []
    for line in back:
        result.append(line[:-1])
    return result


s = "foo"
last_bit = getLastLines(r'C:Documents and Settingsricardo.m.reyesMy Documentsdesarrollotail.py', 10)
for line in last_bit:
    if line == s:
        print "FOUND"

也许这可能很有用:

import os.path
path = 'path_to_file'
os.system('tail -n1 ' + path)

执行此问题标题中所述任务的快速而肮脏的解决方案:

"foo" in deque(f, 10)

检查最后十行中是否有任何一行是"foo"。文件f从前到后读取并由内置内容分隔,而deque仅将最后 10 行保留在内存中。

对于已经在文本模式下打开的 UTF-8/UTF-6 编码文件的体面解决方案,因为整个文件都需要在使用时 f.seek ,或者如果您只是在寻找方便的 oneliner。

引自 docs.python.org:

如果未指定 maxlen 或为 None,则 deques 可能会增长到任意长度。否则,双端面将绑定到指定的最大长度。一旦有界长度的 deque 已满,当添加新项目时,相应数量的项目将从另一端丢弃。

包装为函数

from collections import deque
def tail(path: str, n: int = 10, mode: str = "r+") -> deque:
    """
    Reads a file at `path` in file mode `mode` and returns the last `n` files of that file as `deque`.
    """
    with open(path, mode) as f:
        return deque(f, n)
"foo" in tail("/path/to/file", 10)

这将返回最后 10 行作为列表,然后您可以轻松搜索您的行。(兼容 Python 3(

def read_last_n_lines_new(lines_need=10):
    with open('Log.txt', 'rb') as f:
        f.seek(0, 2)
        data = []
        lines_found = 0
        while True:
            try:
                f.seek(-1, 1)
            except:
                break
            finally:
                c = f.read(1)
                f.seek(-1, 1)
            if c == b'n':
                lines_found = lines_found+1
            if lines_found > lines_need or not c:
                break
            data.insert(0, c.decode('utf-8'))
            
        
        lines = []
        cur = ""
        for l in data:
            if(l == 'n'):
                lines.append(cur)
                cur = ''
            else:
                cur = cur + l
        return lines

相关内容

  • 没有找到相关文章

最新更新