我有一个文件,我不知道它会有多大(它可能很大,但大小会有很大差异(。 我想搜索最后 10 行左右,看看它们中是否有任何一行与字符串匹配。 我需要尽可能快速有效地做到这一点,并且想知道是否有比以下更好的方法:
s = "foo"
last_bit = fileObj.readlines()[-10:]
for line in last_bit:
if line == s:
print "FOUND"
# Tail
from __future__ import with_statement
find_str = "FIREFOX" # String to find
fname = "g:/autoIt/ActiveWin.log_2" # File to check
with open(fname, "r") as f:
f.seek (0, 2) # Seek @ EOF
fsize = f.tell() # Get Size
f.seek (max (fsize-1024, 0), 0) # Set pos @ last n chars
lines = f.readlines() # Read to end
lines = lines[-10:] # Get last 10 lines
# This returns True if any line is exactly find_str + "n"
print find_str + "n" in lines
# If you're searching for a substring
for line in lines:
if find_str in line:
print True
break
这是一个类似于 MizardX 的答案,但在最坏的情况下,在添加块时,它没有明显的二次时间,即在最坏的情况下反复重新扫描工作字符串以查找换行符。
与活动状态解决方案(似乎也是二次的(相比,这不会在给定空文件的情况下爆炸,并且每个块读取一个查找而不是两个。
与生成"尾巴"相比,这是自给自足的。(但"尾巴"是最好的,如果你有它。
与从末端抓取几 kB 并希望它足够相比,这适用于任何行长度。
import os
def reversed_lines(file):
"Generate the lines of file in reverse order."
part = ''
for block in reversed_blocks(file):
for c in reversed(block):
if c == 'n' and part:
yield part[::-1]
part = ''
part += c
if part: yield part[::-1]
def reversed_blocks(file, blocksize=4096):
"Generate blocks of file's contents in reverse order."
file.seek(0, os.SEEK_END)
here = file.tell()
while 0 < here:
delta = min(blocksize, here)
here -= delta
file.seek(here, os.SEEK_SET)
yield file.read(delta)
要根据要求使用它,请执行以下操作:
from itertools import islice
def check_last_10_lines(file, key):
for line in islice(reversed_lines(file), 10):
if line.rstrip('n') == key:
print 'FOUND'
break
编辑:在head((中将map((更改为itertools.imap((。编辑 2:简化reversed_blocks((。编辑 3:避免重新扫描尾部换行符。编辑4:重写了reversed_lines((,因为str.splitlines((忽略了最后的'',正如BrianB注意到的那样(谢谢(。
请注意,在非常旧的 Python 版本中,此处循环中的字符串连接将需要二次时间。至少在过去几年中,CPython自动避免了这个问题。
如果你在POSIX系统上运行Python,你可以使用'tail -10'来检索最后几行。这可能比编写自己的 Python 代码来获取最后 10 行更快。不要直接打开文件,而是从命令"tail -10 文件名"打开管道。如果您确定日志输出(例如,您知道从来没有任何数百或数千个字符长的很长的行(,那么使用列出的"读取最后 2KB"方法之一就可以了。
我认为读取文件的最后 2 KB 左右应该确保你得到 10 行,并且不应该太占用资源。
file_handle = open("somefile")
file_size = file_handle.tell()
file_handle.seek(max(file_size - 2*1024, 0))
# this will get rid of trailing newlines, unlike readlines()
last_10 = file_handle.read().splitlines()[-10:]
assert len(last_10) == 10, "Only read %d lines" % len(last_10)
这是一个使用mmap
的版本,看起来非常有效。最大的优点是mmap
会自动为您处理文件到内存分页要求。
import os
from mmap import mmap
def lastn(filename, n):
# open the file and mmap it
f = open(filename, 'r+')
m = mmap(f.fileno(), os.path.getsize(f.name))
nlcount = 0
i = m.size() - 1
if m[i] == 'n': n += 1
while nlcount < n and i > 0:
if m[i] == 'n': nlcount += 1
i -= 1
if i > 0: i += 2
return m[i:].splitlines()
target = "target string"
print [l for l in lastn('somefile', 10) if l == target]
我记得当我不得不做类似的事情时,我改编了 Manu Garg 的这篇博文中的代码。
如果你使用的是 unix 机器,os.popen("tail -10 " + filepath).readlines()
可能是最快的方法。 否则,这取决于您希望它有多强大。 到目前为止提出的方法都将以一种或另一种方式失败。 为了在最常见的情况下实现健壮性和速度,您可能需要对数搜索之类的东西:使用 file.seek 转到文件末尾减去 1000 个字符,读入它,检查它包含多少行,然后到 EOF 减去 3000 个字符,读取 2000 个字符,计算行数,然后 EOF 减去 7000,读取 4000 个字符, 计算行数等,直到您有尽可能多的行。 但是,如果您确定它总是会在具有合理行长度的文件上运行,则可能不需要它。
您可能还会在 unix tail
命令的源代码中找到一些灵感。
我遇到了这个问题,解析了 LARGE 系统日志文件的最后一小时,并从 activestate 的配方站点使用了这个函数......(http://code.activestate.com/recipes/439045/(
!/usr/bin/env python
# -*-mode: python; coding: iso-8859-1 -*-
#
# Copyright (c) Peter Astrand <astrand@cendio.se>
import os
import string
class BackwardsReader:
"""Read a file line by line, backwards"""
BLKSIZE = 4096
def readline(self):
while 1:
newline_pos = string.rfind(self.buf, "n")
pos = self.file.tell()
if newline_pos != -1:
# Found a newline
line = self.buf[newline_pos+1:]
self.buf = self.buf[:newline_pos]
if pos != 0 or newline_pos != 0 or self.trailing_newline:
line += "n"
return line
else:
if pos == 0:
# Start-of-file
return ""
else:
# Need to fill buffer
toread = min(self.BLKSIZE, pos)
self.file.seek(-toread, 1)
self.buf = self.file.read(toread) + self.buf
self.file.seek(-toread, 1)
if pos - toread == 0:
self.buf = "n" + self.buf
def __init__(self, file):
self.file = file
self.buf = ""
self.file.seek(-1, 2)
self.trailing_newline = 0
lastchar = self.file.read(1)
if lastchar == "n":
self.trailing_newline = 1
self.file.seek(-1, 2)
# Example usage
br = BackwardsReader(open('bar'))
while 1:
line = br.readline()
if not line:
break
print repr(line)
它运行得非常好,并且比 fileObj.readlines(([-10:] 等任何东西更有效,它使 python 将整个文件读入内存,然后从中砍掉最后十行。
我接受了mhawke的建议使用mmap
并编写了一个使用rfind
的版本:
from mmap import mmap
import sys
def reverse_file(f):
mm = mmap(f.fileno(), 0)
nl = mm.size() - 1
prev_nl = mm.size()
while nl > -1:
nl = mm.rfind('n', 0, nl)
yield mm[nl + 1:prev_nl]
prev_nl = nl + 1
def main():
# Example usage
with open('test.txt', 'r+') as infile:
for line in reverse_file(infile):
sys.stdout.write(line)
感谢 18 Darius Bacon 的解决方案,但实现速度提高了 30%,并包装到 io 中。基本IO类。
class ReverseFile(io.IOBase):
def __init__ (self, filename, headers=1):
self.fp = open(filename)
self.headers = headers
self.reverse = self.reversed_lines()
self.end_position = -1
self.current_position = -1
def readline(self, size=-1):
if self.headers > 0:
self.headers -= 1
raw = self.fp.readline(size)
self.end_position = self.fp.tell()
return raw
raw = next(self.reverse)
if self.current_position > self.end_position:
return raw
raise StopIteration
def reversed_lines(self):
"""Generate the lines of file in reverse order.
"""
part = ''
for block in self.reversed_blocks():
block = block + part
block = block.split('n')
block.reverse()
part = block.pop()
if block[0] == '':
block.pop(0)
for line in block:
yield line + 'n'
if part:
yield part
def reversed_blocks(self, blocksize=0xFFFF):
"Generate blocks of file's contents in reverse order."
file = self.fp
file.seek(0, os.SEEK_END)
here = file.tell()
while 0 < here:
delta = min(blocksize, here)
here -= delta
file.seek(here, os.SEEK_SET)
self.current_position = file.tell()
yield file.read(delta)
一个例子
rev = ReverseFile(filename)
for i, line in enumerate(rev):
print("{0}: {1}".format(i, line.strip()))
您可以将 1,000 字节左右的块从文件末尾读取到缓冲区中,直到有 10 行。
您还可以在反转文件时计算行数,而不是猜测字节偏移量。
lines = 0
chunk_size = 1024
f = file('filename')
f.seek(0, 2)
f.seek(f.tell() - chunk_size)
while True:
s = f.read(chunk_size)
lines += s.count('n')
if lines > NUM_OF_LINES:
break
f.seek(f.tell() - chunk_size*2)
现在,文件处于运行readlines()
的良好位置。您还可以缓存第一次读取的字符串,以消除两次读取文件的同一部分。
读取文件的最后几个 K,并将其拆分为行以仅返回最后 10 个。
该块的开头不太可能落在行边界上,但无论如何您都会丢弃第一行。
就我个人而言,我很想突破到外壳并调用 tail -n10 来加载文件。但是我并不是真正的Python程序员;)
首先,一个返回列表的函数:
def lastNLines(file, N=10, chunksize=1024):
lines = None
file.seek(0,2) # go to eof
size = file.tell()
for pos in xrange(chunksize,size-1,chunksize):
# read a chunk
file.seek(pos,2)
chunk = file.read(chunksize)
if lines is None:
# first time
lines = chunk.splitlines()
else:
# other times, update the 'first' line with
# the new data, and re-split
lines[0:1] = (chunk + lines[0]).splitlines()
if len(lines) > N:
return lines[-N:]
file.seek(0)
chunk = file.read(size-pos)
lines[0:1] = (chunk + lines[0]).splitlines()
return lines[-N:]
其次,一个以相反顺序迭代行的函数:
def iter_lines_reversed(file, chunksize=1024):
file.seek(0,2)
size = file.tell()
last_line = ""
for pos in xrange(chunksize,size-1,chunksize):
# read a chunk
file.seek(pos,2)
chunk = file.read(chunksize) + last_line
# split into lines
lines = chunk.splitlines()
last_line = lines[0]
# iterate in reverse order
for index,line in enumerate(reversed(lines)):
if index > 0:
yield line
# handle the remaining data at the beginning of the file
file.seek(0)
chunk = file.read(size-pos) + last_line
lines = chunk.splitlines()
for line in reversed(lines):
yield line
对于您的示例:
s = "foo"
for index, line in enumerate(iter_lines_reversed(fileObj)):
if line == s:
print "FOUND"
break
elif index+1 >= 10:
break
编辑:现在自动
获取文件大小编辑2:现在只迭代 10 行。
此解决方案将只读取文件一次,但使用 2 个文件对象指针来获取文件的最后 N 行,而无需重新读取它:
def getLastLines (path, n):
# return the las N lines from the file indicated in path
fp = open(path)
for i in range(n):
line = fp.readline()
if line == '':
return []
back = open(path)
for each in fp:
back.readline()
result = []
for line in back:
result.append(line[:-1])
return result
s = "foo"
last_bit = getLastLines(r'C:Documents and Settingsricardo.m.reyesMy Documentsdesarrollotail.py', 10)
for line in last_bit:
if line == s:
print "FOUND"
也许这可能很有用:
import os.path
path = 'path_to_file'
os.system('tail -n1 ' + path)
执行此问题标题中所述任务的快速而肮脏的解决方案:
"foo" in deque(f, 10)
检查最后十行中是否有任何一行是"foo"。文件f
从前到后读取并由内置内容分隔,而deque
仅将最后 10 行保留在内存中。
对于已经在文本模式下打开的 UTF-8/UTF-6 编码文件的体面解决方案,因为整个文件都需要在使用时 f.seek
,或者如果您只是在寻找方便的 oneliner。
引自 docs.python.org:
如果未指定 maxlen 或为 None,则 deques 可能会增长到任意长度。否则,双端面将绑定到指定的最大长度。一旦有界长度的 deque 已满,当添加新项目时,相应数量的项目将从另一端丢弃。
包装为函数
from collections import deque
def tail(path: str, n: int = 10, mode: str = "r+") -> deque:
"""
Reads a file at `path` in file mode `mode` and returns the last `n` files of that file as `deque`.
"""
with open(path, mode) as f:
return deque(f, n)
"foo" in tail("/path/to/file", 10)
这将返回最后 10 行作为列表,然后您可以轻松搜索您的行。(兼容 Python 3(
def read_last_n_lines_new(lines_need=10):
with open('Log.txt', 'rb') as f:
f.seek(0, 2)
data = []
lines_found = 0
while True:
try:
f.seek(-1, 1)
except:
break
finally:
c = f.read(1)
f.seek(-1, 1)
if c == b'n':
lines_found = lines_found+1
if lines_found > lines_need or not c:
break
data.insert(0, c.decode('utf-8'))
lines = []
cur = ""
for l in data:
if(l == 'n'):
lines.append(cur)
cur = ''
else:
cur = cur + l
return lines