我有这个 csv 日志文件,它的大小 (GB) 很大,没有标题行:
1,<timestamp>,BEGIN
1,<timestamp>,fetched from db
1,<timestamp>,some processing
2,<timestamp>,BEGIN
2,<timestamp>,fetched from db
1,<timestamp>,returned success
3,<timestamp>,BEGIN
4,<timestamp>,BEGIN
1,<timestamp>,END
3,<timestamp>,some work
2,<timestamp>,some processing
4,<timestamp>,waiting for
2,<timestamp>,ERROR
3,<timestamp>,attempting other work
4,<timestamp>,ERROR
3,<timestamp>,attempting other work
每行都是一个跟踪日志,第一个字段是RequestID
。
需要扫描文件并仅存储导致'ERROR'
到另一个文件的请求的日志。
import csv
def readFile(filename):
with open(filename, 'r') as fn:
reader = csv.reader(fn)
for line in reversed(list(reader)):
yield (line)
def wrt2File():
rows = readFile('log.csv')
with open('error.csv', 'w') as fn:
writer = csv.writer(fn)
errReqIds = []
for row in rows:
if 'ERROR' in row:
errReqIds.append(row[0])
if row[0] in errReqIds:
writer.writerow(row)
wrt2File()
如何改进我的代码,使其不使用内存进行readFile操作和此代码的可重用性?我不想使用熊猫,如果有更好的选择。
这看起来根本不像CSV。 我可以提出以下建议:
def extract(filename):
previous = dict()
current = set()
with open(filename) as inputfile:
for line in inputfile:
id, rest = line.split(' ')
if 'ERROR' in line:
if id in previous:
for kept in previous[id]:
yield(kept)
del previous[id]
yield(line)
current.add(id)
elif id in current:
yield(line)
# Maybe do something here to remove really old entries from previous
def main():
import sys
for filename in sys.argv[1:]:
for line in extract(filename):
print(line)
if __name__ == '__main__':
main()
这只会打印到标准输出。您可以重构它以接受输出文件名作为选项,并根据需要在该文件句柄上使用write
。
由于您的文件很大,您可能需要一个避免将整个文件加载到内存中的解决方案。以下方法可以完成此工作:
def find_errors(filename):
with open(filename) as f:
return {l[0:3] for l in f if 'ERROR' in l}
def wrt2File():
error_ids = find_errors('log.csv')
with open('error.csv', 'w') as fw, open('log.csv') as fr:
[fw.write(l) for l in fr if l[0:3] in error_ids]
请注意,我假设 id 是该行的前 3 个字符,如果需要,请进行更改。
这里有一些应该相当快的东西,可能是因为它将整个文件读入内存来处理它。你还没有定义你所说的"效率"是什么意思,所以我假设它是速度,你的计算机有足够的内存来做到这一点——因为这就是你问题中的代码所做的。
import csv
from itertools import groupby
from operator import itemgetter
REQUEST_ID = 0 # Column
RESULT = 2 # Column
ERROR_RESULT = 'ERROR'
keyfunc = itemgetter(REQUEST_ID)
def wrt2File(inp_filename, out_filename):
# Read log data into memory and sort by request id column.
with open(inp_filename, 'r', newline='') as inp:
rows = list(csv.reader(inp))
rows.sort(key=keyfunc)
with open(out_filename, 'w', newline='') as outp:
csv_writer = csv.writer(outp)
for k, g in groupby(rows, key=keyfunc):
g = list(g)
# If any of the lines in group have error indicator, write
# them to error csv.
has_error = False
for row in g:
if row[RESULT] == ERROR_RESULT:
has_error = True
break
if has_error:
csv_writer.writerows(g)
wrt2File('log.csv', 'error.csv')
更新:
既然我现在知道你不想把它全部读到内存里,这里有一个选择。它会读取整个文件两次。第一次只是确定哪些请求 ID 在记录它们的行中存在错误。此信息第二次用于确定要写入错误 csv 的行。您的操作系统应该进行一定数量的文件缓冲/和数据缓存,因此希望这是一个可以接受的权衡。
请务必注意,输出文件中有错误的请求 ID 行不会分组在一起,因为此方法不会对它们进行排序。
import csv
REQUEST_ID = 0 # Column
RESULT = 2 # Column
ERROR_RESULT = 'ERROR'
def wrt2File(inp_filename, out_filename):
# First pass:
# Read entire log file and determine which request id had errors.
error_requests = set() # Used to filter rows in second pass.
with open(inp_filename, 'r', newline='') as inp:
for row in csv.reader(inp):
if row[RESULT] == ERROR_RESULT:
error_requests.add(row[REQUEST_ID])
# Second pass:
# Read log file again and write rows associated with request ids
# which had errors to the output csv
with open(inp_filename, 'r', newline='') as inp:
with open(out_filename, 'w', newline='') as outp:
csv_writer = csv.writer(outp)
for row in csv.reader(inp)
if row[RESULT] in error_requests:
csv_writer.writerow(row)
wrt2File('log.csv', 'error.csv')
print('done')