在多线程中处理一个巨大的文件,然后将其写回另一个文件



我有一个巨大的XML文件(几乎5GIG)。我尝试在整个文件中搜索,找到一些标签并将其重命名。我在这里使用了相同的想法将文件块分成10兆字节块,搜索每个块,如果该块包含搜索项,然后将块发送给另一个助手,以通过行读取块并替换标签。这是行不通的!似乎当试图合并队列并将文件写回去时,结果文件从任意的某个地方开始。

import re, threading, Queue
FILE_R = "C:\Users\USOMZIADesktop\ABB_Work\ERCOT\Modifying_cim_model\omid2.xml"
FILE_WR = "C:\Users\USOMZIADesktop\ABB_Work\ERCOT\Modifying_cim_model\x3.xml"
def get_chunks(file_r, size = 1024 * 1024):
    with open(file_r, 'rb') as f:
        while 1:
            start = f.tell()
            f.seek(size, 1)
            s = f.readline()
            yield start, f.tell() - start
            if not s:
                break
def process_line_by_line(file_r, chunk):
    with open(file_r, "rb") as f:
        f.seek(chunk[0])
        read_line_list = []
        for line_f in f.read(chunk[1]).splitlines():
            find_match = False
            for match_str in mapp:
                if match_str in str(line_f):
                    find_match = True
                    new_line = str(line_f).replace(match_str, mapp[match_str]) 
                    read_line_list.append(new_line)
                    break
            if not find_match:
                read_line_list.append(str(line_f))
    return read_line_list
def process(file_r, chunk):
    read_group_list = []
    with open(file_r, "r") as f:
        f.seek(chunk[0])
        s = f.read(chunk[1])
        if len(pattern.findall(s)) > 0:
            read_group_list = process_line_by_line(file_r, chunk)
        else:
            read_group_list = f.read(chunk[1]).splitlines()
    return read_group_list
class Worker(threading.Thread):
    def run(self):
        while 1:
            chunk = queue.get()
            if chunk is None:
                break
            result.append(process(*chunk))
            queue.task_done()       


import time, sys
start_time = time.time()
pattern_list = []
mapp = {"cim:ConformLoad rdf:ID": "cim:CustomerLoad rdf:ID", "cim:Load rdf:ID": "cim:CustomerLoad rdf:ID", "cim:NonConformLoad rdf:ID": "cim:CustomerLoad rdf:ID", 
        "cim:InductionMotorLoad rdf:ID": "cim:CustomerLoad rdf:ID", "cim:NonConformLoadGroup rdf:ID": "cim:ConformLoadGroup rdf:ID",
        "cim:NonConformLoad.LoadGroup": "cim:ConformLoad.LoadGroup",
        "/cim:ConformLoad>": "/cim:CustomerLoad>", "/cim:Load>": "/cim:CustomerLoad>", "/cim:NonConformLoad>": "/cim:CustomerLoad>",
        "/cim:InductionMotorLoad>": "/cim:CustomerLoad>", "/cim:NonConformLoadGroup>": "/cim:ConformLoadGroup>"}
reg_string =""
for key in mapp:
    reg_string = reg_string + key+ "|"
# to delete the last |
reg_string = list(reg_string)[:-1]
reg_string = ''.join(reg_string)
pattern = re.compile(r"cim:%s.*" %reg_string)
# This makes it faster than write an mo = pattern.search(line) in the loop
search = pattern.search
queue = Queue.Queue()
result = []
# Start the multithread
for i in range(1):
    w = Worker()
    w.setDaemon(1)
    w.start()
chunks = get_chunks(FILE_R, 10 * 1024 * 1024)
for chunk in chunks:
    print chunk
    queue.put((FILE_R, chunk))
queue.join()
with open(FILE_WR, "w") as f:
    for file_chunk in range(len(result)):
        for line in result[file_chunk]:
            f.write("%sn" % line)

print time.time() - start_time

因此,我认为问题是当队列完成工作时,它们不是以某种方式的顺序,因此它没有同步。无论如何我能以某种方式同步它们吗?谢谢您的帮助!

我想我发现了问题是:

read_group_list = f.read(chunk[1]).splitlines()

该过程函数中的这一行创建了Proble。我替换为:

read_group_list = s.splitlines()

现在给我正确的文件。

相关内容

最新更新