我有一个巨大的XML文件(几乎5GIG)。我尝试在整个文件中搜索,找到一些标签并将其重命名。我在这里使用了相同的想法将文件块分成10兆字节块,搜索每个块,如果该块包含搜索项,然后将块发送给另一个助手,以通过行读取块并替换标签。这是行不通的!似乎当试图合并队列并将文件写回去时,结果文件从任意的某个地方开始。
import re, threading, Queue
FILE_R = "C:\Users\USOMZIADesktop\ABB_Work\ERCOT\Modifying_cim_model\omid2.xml"
FILE_WR = "C:\Users\USOMZIADesktop\ABB_Work\ERCOT\Modifying_cim_model\x3.xml"
def get_chunks(file_r, size = 1024 * 1024):
with open(file_r, 'rb') as f:
while 1:
start = f.tell()
f.seek(size, 1)
s = f.readline()
yield start, f.tell() - start
if not s:
break
def process_line_by_line(file_r, chunk):
with open(file_r, "rb") as f:
f.seek(chunk[0])
read_line_list = []
for line_f in f.read(chunk[1]).splitlines():
find_match = False
for match_str in mapp:
if match_str in str(line_f):
find_match = True
new_line = str(line_f).replace(match_str, mapp[match_str])
read_line_list.append(new_line)
break
if not find_match:
read_line_list.append(str(line_f))
return read_line_list
def process(file_r, chunk):
read_group_list = []
with open(file_r, "r") as f:
f.seek(chunk[0])
s = f.read(chunk[1])
if len(pattern.findall(s)) > 0:
read_group_list = process_line_by_line(file_r, chunk)
else:
read_group_list = f.read(chunk[1]).splitlines()
return read_group_list
class Worker(threading.Thread):
def run(self):
while 1:
chunk = queue.get()
if chunk is None:
break
result.append(process(*chunk))
queue.task_done()
import time, sys
start_time = time.time()
pattern_list = []
mapp = {"cim:ConformLoad rdf:ID": "cim:CustomerLoad rdf:ID", "cim:Load rdf:ID": "cim:CustomerLoad rdf:ID", "cim:NonConformLoad rdf:ID": "cim:CustomerLoad rdf:ID",
"cim:InductionMotorLoad rdf:ID": "cim:CustomerLoad rdf:ID", "cim:NonConformLoadGroup rdf:ID": "cim:ConformLoadGroup rdf:ID",
"cim:NonConformLoad.LoadGroup": "cim:ConformLoad.LoadGroup",
"/cim:ConformLoad>": "/cim:CustomerLoad>", "/cim:Load>": "/cim:CustomerLoad>", "/cim:NonConformLoad>": "/cim:CustomerLoad>",
"/cim:InductionMotorLoad>": "/cim:CustomerLoad>", "/cim:NonConformLoadGroup>": "/cim:ConformLoadGroup>"}
reg_string =""
for key in mapp:
reg_string = reg_string + key+ "|"
# to delete the last |
reg_string = list(reg_string)[:-1]
reg_string = ''.join(reg_string)
pattern = re.compile(r"cim:%s.*" %reg_string)
# This makes it faster than write an mo = pattern.search(line) in the loop
search = pattern.search
queue = Queue.Queue()
result = []
# Start the multithread
for i in range(1):
w = Worker()
w.setDaemon(1)
w.start()
chunks = get_chunks(FILE_R, 10 * 1024 * 1024)
for chunk in chunks:
print chunk
queue.put((FILE_R, chunk))
queue.join()
with open(FILE_WR, "w") as f:
for file_chunk in range(len(result)):
for line in result[file_chunk]:
f.write("%sn" % line)
print time.time() - start_time
因此,我认为问题是当队列完成工作时,它们不是以某种方式的顺序,因此它没有同步。无论如何我能以某种方式同步它们吗?谢谢您的帮助!
我想我发现了问题是:
read_group_list = f.read(chunk[1]).splitlines()
该过程函数中的这一行创建了Proble。我替换为:
read_group_list = s.splitlines()
现在给我正确的文件。