在不使用 pandas 的情况下以可重用代码提取大量 csv 日志文件的有效方法(无内存中读取)



我有这个 csv 日志文件,它的大小 (GB) 很大,没有标题行:

1,<timestamp>,BEGIN
1,<timestamp>,fetched from db
1,<timestamp>,some processing
2,<timestamp>,BEGIN
2,<timestamp>,fetched from db
1,<timestamp>,returned success
3,<timestamp>,BEGIN
4,<timestamp>,BEGIN
1,<timestamp>,END
3,<timestamp>,some work
2,<timestamp>,some processing
4,<timestamp>,waiting for
2,<timestamp>,ERROR
3,<timestamp>,attempting other work
4,<timestamp>,ERROR
3,<timestamp>,attempting other work 

每行都是一个跟踪日志,第一个字段是RequestID
需要扫描文件并仅存储导致'ERROR'到另一个文件的请求的日志。

import csv
def readFile(filename):
    with open(filename, 'r') as fn:
        reader = csv.reader(fn)
        for line in reversed(list(reader)):
            yield (line)
def wrt2File():
    rows = readFile('log.csv')
    with open('error.csv', 'w') as fn:
        writer = csv.writer(fn)
        errReqIds = []
        for row in rows:
            if 'ERROR' in row:
                errReqIds.append(row[0])
            if row[0] in errReqIds:
                writer.writerow(row)
wrt2File()

如何改进我的代码,使其不使用内存进行readFile操作和此代码的可重用性?我不想使用熊猫,如果有更好的选择。

这看起来根本不像CSV。 我可以提出以下建议:

def extract(filename):
    previous = dict()
    current = set()
    with open(filename) as inputfile:
        for line in inputfile:
            id, rest = line.split(' ')
            if 'ERROR' in line:
                if id in previous:
                    for kept in previous[id]:
                        yield(kept)
                    del previous[id]
                yield(line)
                current.add(id)
            elif id in current:
                yield(line)
    # Maybe do something here to remove really old entries from previous
def main():
    import sys
    for filename in sys.argv[1:]:
        for line in extract(filename):
            print(line)
if __name__ == '__main__':
    main()

这只会打印到标准输出。您可以重构它以接受输出文件名作为选项,并根据需要在该文件句柄上使用write

由于您的文件很大,您可能需要一个避免将整个文件加载到内存中的解决方案。以下方法可以完成此工作:

def find_errors(filename):
    with open(filename) as f:
        return {l[0:3] for l in f if 'ERROR' in l}
def wrt2File():
    error_ids = find_errors('log.csv')
    with open('error.csv', 'w') as fw, open('log.csv') as fr:
        [fw.write(l) for l in fr if l[0:3] in error_ids]

请注意,我假设 id 是该行的前 3 个字符,如果需要,请进行更改。

这里有一些应该相当快的东西,可能是因为它将整个文件读入内存来处理它。你还没有定义你所说的"效率"是什么意思,所以我假设它是速度,你的计算机有足够的内存来做到这一点——因为这就是你问题中的代码所做的。

import csv
from itertools import groupby
from operator import itemgetter

REQUEST_ID = 0  # Column
RESULT = 2  # Column
ERROR_RESULT = 'ERROR'
keyfunc = itemgetter(REQUEST_ID)
def wrt2File(inp_filename, out_filename):
    # Read log data into memory and sort by request id column.
    with open(inp_filename, 'r', newline='') as inp:
        rows = list(csv.reader(inp))
        rows.sort(key=keyfunc)
    with open(out_filename, 'w', newline='') as outp:
        csv_writer = csv.writer(outp)
        for k, g in groupby(rows, key=keyfunc):
            g = list(g)
            # If any of the lines in group have error indicator, write
            # them to error csv.
            has_error = False
            for row in g:
                if row[RESULT] == ERROR_RESULT:
                    has_error = True
                    break
            if has_error:
                csv_writer.writerows(g)
wrt2File('log.csv', 'error.csv')

更新:

既然我现在知道你不想把它全部读到内存里,这里有一个选择。它会读取整个文件两次。第一次只是确定哪些请求 ID 在记录它们的行中存在错误。此信息第二次用于确定要写入错误 csv 的行。您的操作系统应该进行一定数量的文件缓冲/和数据缓存,因此希望这是一个可以接受的权衡。

请务必注意,输出文件中有错误的请求 ID 行不会分组在一起,因为此方法不会对它们进行排序。

import csv

REQUEST_ID = 0  # Column
RESULT = 2  # Column
ERROR_RESULT = 'ERROR'
def wrt2File(inp_filename, out_filename):
    # First pass:
    #   Read entire log file and determine which request id had errors.
    error_requests = set()  # Used to filter rows in second pass.
    with open(inp_filename, 'r', newline='') as inp:
        for row in csv.reader(inp):
            if row[RESULT] == ERROR_RESULT:
                error_requests.add(row[REQUEST_ID])
    # Second pass:
    #   Read log file again and write rows associated with request ids
    #   which had errors to the output csv
    with open(inp_filename, 'r', newline='') as inp:
        with open(out_filename, 'w', newline='') as outp:
            csv_writer = csv.writer(outp)
            for row in csv.reader(inp)
                if row[RESULT] in error_requests:
                    csv_writer.writerow(row)

wrt2File('log.csv', 'error.csv')
print('done')

最新更新