我有一个包含报纸文章的大型XML文件,我想通过使用多处理有效地标记这些文章。
XML 文件非常简单,如下所示:
<?xml version="1.0" encoding="utf-8"?>
<corpus>
<text id="1">
<body>
<title>Some headline words</title>
<p>A sentence. Another sentence.</p>
<p>Third sentence.</p>
</body>
</text>
<text id="2">
<body>
<title>Some other headline words</title>
<p>A stupid sentence. Another stupid sentence.</p>
<p>Third stupid sentence.</p>
</body>
</text>
...
</corpus>
到目前为止,我使用iterparse
中的ElementTree
解析文件。但是,由于任务非常尴尬地并行,我考虑过另外使用多处理。所以我想保持iterparse
的低内存消耗优势,但想将文本元素的处理拆分到多个处理器。顺序对我来说并不重要。
我使用以下代码尝试了它,但它以TypeError: 'NoneType' object is not iterable
退出:
def text_to_tokens(text_elem):
text_id = text_elem.get("id")
tokens = [tokenize(elem.text) for elem in text_elem.find("./body")]
with open(f"{text_id}.txt", "w") as file:
file.write(str(tokens))
def tokenize(string):
return string.split(" ")
if __name__ == "__main__":
with multiprocessing.Pool(processes=4) as pool:
context = iter(ET.iterparse("corpus_file.xml", events=("start", "end")))
event, root = next(context)
for event, elem in context:
if event == "end" and elem.tag == "text":
pool.map(text_to_tokens, elem)
root.clear()
如何让多处理工作?非常感谢任何其他提示或方法,我只想并行化此任务并使其尽可能快。
完整的退出代码如下所示:
multiprocessing.pool.RemoteTraceback:
"""
Traceback (most recent call last):
File "/home/daniel/miniconda3/lib/python3.7/multiprocessing/pool.py", line 121, in worker
result = (True, func(*args, **kwds))
File "/home/daniel/miniconda3/lib/python3.7/multiprocessing/pool.py", line 44, in mapstar
return list(map(*args))
File "stack_overflow_test.py", line 15, in text_to_tokens
tokens = [tokenize(elem.text) for elem in text_elem]
TypeError: 'NoneType' object is not iterable
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "stack_overflow_test.py", line 30, in <module>
pool.map(text_to_tokens, elem)
File "/home/daniel/miniconda3/lib/python3.7/multiprocessing/pool.py", line 268, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
File "/home/daniel/miniconda3/lib/python3.7/multiprocessing/pool.py", line 657, in get
raise self._value
TypeError: 'NoneType' object is not iterable
单独的令牌提取和文件创建。使用concurrent.futures
处理并发详细信息。
更改text_to_tokens
以考虑body
元素并返回数据,而不是创建文件。
def text_to_tokens(text_elem):
text_id = text_elem.get("id")
fname = f"{text_id}.txt"
tokens = []
for elem in text_elem.iter():
if elem.tag in ('text','body'):
continue
tokens.append(tokenize(elem.text))
return fname,tokens
用于创建文件的新功能。
def write(fname, data):
with open(fname,'w') as f:
f.write(str(data))
在单独的进程中提取令牌,并在线程中写入文件。
if __name__ == "__main__":
context = iter(ET.iterparse(f, events=("start", "end")))
event, root = next(context)
token_futures = []
write_futures = []
with ProcPoolExc() as ppe, ThreadPoolExc() as tpe:
for event, elem in context:
if event == "end" and elem.tag == "text":
token_futures.append(ppe.submit(text_to_tokens, elem))
for future in concurrent.futures.as_completed(token_futures):
fname,data = future.result()
write_futures.append(tpe.submit(write, *(fname,data)))
for fut in concurrent.futures.as_completed(write_futures):
e = fut.exception()
print('success' if not e else e)
设置:
import concurrent.futures, io
import xml.etree.ElementTree as ET
ProcPoolExc = concurrent.futures.ProcessPoolExecutor
ThreadPoolExc = concurrent.futures.ThreadPoolExecutor
s = '''<?xml version="1.0" encoding="utf-8"?>
<corpus>
<text id="1">
<body>
<title>Some headline words</title>
<p>A sentence. Another sentence.</p>
<p>Third sentence.</p>
</body>
</text>
<text id="2">
<body>
<title>Some other headline words</title>
<p>A stupid sentence. Another stupid sentence.</p>
<p>Third stupid sentence.</p>
</body>
</text>
<text id="3">
<body>
<title>Some other headline words</title>
<p>A stupid sentence. Another stupid sentence.</p>
<p>Third stupid sentence.</p>
</body>
</text>
<text id="4">
<body>
<title>Some other headline words</title>
<p>A stupid sentence. Another stupid sentence.</p>
<p>Third stupid sentence.</p>
</body>
</text>
</corpus>'''
f = io.StringIO(s)
def tokenize(string):
return string.split(" ")
如果您在没有with multiprocessing.Pool(processes=4) as pool:
的情况下尝试此代码会发生什么?
行得通吗?
如果不是,则问题出在您如何使用多处理之外。
如果是,请尝试在创建池之前完成迭代器的初始化。 即:在初始化池之前放置以下内容。
context = iter(ET.iterparse("corpus_file.xml", events=("start", "end")))
event, root = next(context)
多处理 pickles 对象,因此如果在创建池后初始化,则可能无法正确传递。