Python多处理不会停止运行,似乎需要更长的时间



我试图使用多处理来加快我正在做的一些数据标记,但注意到它需要更长的时间(实际上我从未看到程序终止(。最初的脚本运行大约需要7个小时,但今天早上我来上班,注意到它在昨天晚上运行后仍在运行。

任务概述

Input:
1) Pandas DataFrame with a column of text
2) Dictionary that looks like {word: label}.
(Desired) Output:
Same DataFrame but this time with the positions of the words marked in the text.
Example:
DataFrame:
----------------------------------------------------
| text
0 | I live in the United States.
----------------------------------------------------
Dict: {'United States': 'country'}
Output DataFrame:
----------------------------------------------------
| text                         | labels
0 | I live in the United States. | [14, 26]
----------------------------------------------------

为了稍微解释一下结果,子串'United States'位于文本中的位置14-26。我基本上是在DataFrame上迭代,在字典上进一步迭代,然后使用正则表达式标记位置。

我做了什么

<Original Code>
def label_data(df, dict):
pbar = tqdm(iterable=df.to_dict('records'), total=df.shape[0])
for idx, row in enumerate(pbar): text = row['text'] spans = []    
for word, label in label_dict.items():
for match in re.finditer(word, text):
start = match.start()
end = match.end()
spans.append([start, end])
row['labels'] = spans
df.iloc[idx] = row
return df
<Parallelized Code>
from itertools import product
import multiprocessing as mp
import numpy as np
def label_data_parallel(df, dict):
num_cores = mp.cpu_count() - 1 pool = mp.Pool(num_cores)
df_chunks = np.array_split(df, num_cores)
labeled_dfs = pool.starmap(label_data, 
product(df_chunks, [dict] * num_cores))
df = pd.concat(labeled_dfs, axis=0)
pool.close()
pool.join()
return df

我的代码有什么问题吗?此外,DataFrame有大约200000行,字典有大约3000个键值对。

您考虑过其他算法吗?

三个想法:

  1. 不是在数据帧上迭代,而是在组合文本中搜索。这种搜索已经优化和研究了几十年,所以应该很快,并有望在python重新库中很好地实现。然而,不要添加因为线条粘在一起而出现的标签,请添加一个吉贝利分隔符。我用了"@@@@@@&";。胡言乱语的分隔符(我承认看起来不太好(可以用一个简单的检查来代替,检查行边界是否匹配,所以跳过它
  2. 所有要搜索的键也可以粘贴到一个regex模式中,然后所有的工作都通过以更有效的方式重新库来完成
  3. regex模式可以优化为trie,如下所示:在Python 3中加速数百万regex替换

例如:

import nltk
import pandas as pd
import re
import string
import random
from nltk.corpus import words
random.seed(1)
# ------- this is just to create nice test data. Otherwise no need in nltk
nltk.download('words')
all_words = words.words()
data_df = pd.DataFrame(
[
' '.join(random.choices(all_words, k=random.randint(1,20))) for _ in range(200000)
], columns = ["text"])
label_keys = {
random.choice(all_words) for _ in range(3000)    
}
# -------- code starts here
class Trie():
"""Regex::Trie in Python. Creates a Trie out of a list of words. The trie can be exported to a Regex pattern.
The corresponding Regex should match much faster than a simple Regex union."""
def __init__(self):
self.data = {}
def add(self, word):
ref = self.data
for char in word:
ref[char] = char in ref and ref[char] or {}
ref = ref[char]
ref[''] = 1
def dump(self):
return self.data
def quote(self, char):
return re.escape(char)
def _pattern(self, pData):
data = pData
if "" in data and len(data.keys()) == 1:
return None
alt = []
cc = []
q = 0
for char in sorted(data.keys()):
if isinstance(data[char], dict):
try:
recurse = self._pattern(data[char])
alt.append(self.quote(char) + recurse)
except:
cc.append(self.quote(char))
else:
q = 1
cconly = not len(alt) > 0
if len(cc) > 0:
if len(cc) == 1:
alt.append(cc[0])
else:
alt.append('[' + ''.join(cc) + ']')
if len(alt) == 1:
result = alt[0]
else:
result = "(?:" + "|".join(alt) + ")"
if q:
if cconly:
result += "?"
else:
result = "(?:%s)?" % result
return result
def pattern(self):
return self._pattern(self.dump())
trie_pattern = Trie()
for label in label_keys:
trie_pattern.add(re.escape(label))
reg_pattern = trie_pattern.pattern()
list_of_texts = list(data_df.text)
indices = list(map(len,list_of_texts))
all_text = "@@@@@@".join(data_df.text) # @@@@@@ - something of known length you don't expect in the text
all_matches = []
for match_ in re.finditer(reg_pattern, all_text):
all_matches.append(match_.span())
all_matches.sort(key=lambda x: x[0])
label_l = []
start = 0
all_matches_pointer = 0
indices_pointer = 0
label_l.append([]) 
while True:    
if all_matches_pointer >= len(all_matches):
for _ in range(len(label_l),len(data_df)):
label_l.append( [])
break
match_start = all_matches[all_matches_pointer][0]
match_end = all_matches[all_matches_pointer][1]
if match_start >= start + indices[indices_pointer]:
label_l.append([]) 
start += indices[indices_pointer] + 6 # len("@@@@@@")
indices_pointer += 1
else:
label_l[-1] += [(match_start - start, match_end - start)]
all_matches_pointer += 1


data_df["labels"] = label_l
data_df

在几秒钟内给出您想要的结果:

text    labels
0   overempty stirring asyla butchering Sherrymoor  [(5, 6), (19, 21), (42, 43)]
1   premeditator spindliness bilamellate amidosucc...   [(3, 4), (8, 10), (29, 30), (33, 35), (38, 39)...
2   Radek vivicremation rusot noegenetic Shropshir...   [(13, 14), (14, 16), (50, 52), (76, 78), (88, ...
3   uninstructiveness blintze plunging rowiness fi...   [(58, 59), (87, 88), (109, 110), (122, 124), (...
4   memorialize scruffman   [(0, 1), (2, 3), (18, 19)]
... ... ...
199995  emulsor treatiser   [(1, 2), (11, 13)]
199996  squibling anisandrous incorrespondent vague jo...   [(13, 15), (40, 43), (52, 53), (71, 73), (130,...
199997  proallotment bulletheaded uningenuousness plat...   [(0, 5), (8, 9), (44, 46), (62, 65), (75, 77)]
199998  unantiquatedness sulphohalite oversoftness und...   [(6, 10), (32, 35), (65, 67), (68, 71), (118, ...
199999  lenticulothalamic aerometric plastidium panell...   [(14, 15), (22, 23), (31, 33), (38, 39), (46, ...
200000 rows × 2 columns

所以我特别尝试了你的参数(见代码(。20万行和3000个标签的数据帧。算法在我的m1 上只运行3-5秒

尚未解决的问题实际上取决于您的输入:

  1. 如果标签在一行内重叠怎么办?然后需要添加一个循环,以便每次迭代分别搜索每个标签(并且可以多处理(

有几种效率可以实现:

  1. 您可以创建一个优化的单个正则表达式来搜索label_dict中的任何国家,并将该正则表达式传递给您的辅助函数,而不是在每个国家执行re.finditer。考虑到你正在搜索3000个国家,这应该会大大提高速度
  2. 您只需要将一个字符串数组而不是一个数据帧数组与前面提到的已编译正则表达式一起传递给辅助函数
  3. 通过创建mp.cpu_count() - 1的池大小,您将使主进程成为一个可供使用的处理器。但是,您调用的是starmap,它会阻塞直到返回所有结果,此时池进程处于空闲状态。相反,您可以使用方法imap,它可以在辅助函数返回某个内容后立即开始处理结果。但是,主进程所做的处理量可能不足以保证为其指定一个处理器。在下面的代码中,我使用了所有可用于构建池的处理器。但你可以试着在主流程中留下一个,看看这是否更具性能
  4. 辅助函数只需要返回它找到的跨度的列表。主进程将使用此数据向原始数据帧添加一个新列
def label_data(texts, regex):
return [
[[match.start(), match.end()] for match in regex.finditer(text)]
for text in texts
]
def label_data_parallel(df, label_dict):
import multiprocessing as mp
import numpy as np
import re
from functools import partial
class Trie():
"""Regex::Trie in Python. Creates a Trie out of a list of words. The trie can be exported to a Regex pattern.
The corresponding Regex should match much faster than a simple Regex union."""
def __init__(self):
self.data = {}
def add(self, word):
ref = self.data
for char in word:
ref[char] = char in ref and ref[char] or {}
ref = ref[char]
ref[''] = 1
def dump(self):
return self.data
def quote(self, char):
return re.escape(char)
def _pattern(self, pData):
data = pData
if "" in data and len(data.keys()) == 1:
return None
alt = []
cc = []
q = 0
for char in sorted(data.keys()):
if isinstance(data[char], dict):
try:
recurse = self._pattern(data[char])
alt.append(self.quote(char) + recurse)
except:
cc.append(self.quote(char))
else:
q = 1
cconly = not len(alt) > 0
if len(cc) > 0:
if len(cc) == 1:
alt.append(cc[0])
else:
alt.append('[' + ''.join(cc) + ']')
if len(alt) == 1:
result = alt[0]
else:
result = "(?:" + "|".join(alt) + ")"
if q:
if cconly:
result += "?"
else:
result = "(?:%s)?" % result
return result
def pattern(self):
return self._pattern(self.dump())

num_cores = mp.cpu_count()
text_chunks = np.array_split(df['text'].values.tolist(), num_cores)
trie = Trie()
for country in label_dict.keys():
trie.add(country)
regex = re.compile(trie.pattern())
pool = mp.Pool(num_cores)
label_spans = []
for spans in pool.imap(partial(label_data, regex=regex), text_chunks):
label_spans.extend(spans)
pool.close()
pool.join()
df['labels'] = label_spans

return df
def main():
import pandas as pd
df = pd.DataFrame({'text': [
'I live in the United States.',
'I live in the United States.',
'I live in the United States.',
'I live in the United States.',
'I live in the United States.',
'I live in the United States.',
'I live in the United States.',
'I live in the United States.',
'France is where I live But I used to live in the United States.',
'France is where I live But I used to live in the United States.',
'France is where I live But I used to live in the United States.',
'France is where I live But I used to live in the United States.',
'France is where I live But I used to live in the United States.',
'France is where I live But I used to live in the United States.',
'France is where I live But I used to live in the United States.',
'France is where I live But I used to live in the United States.',
]})
label_dict = {
'United States': 'country',
'France': 'country',
}
label_data_parallel(df, label_dict)
print(df)
if __name__ == '__main__':
main()

打印:

text              labels
0                        I live in the United States.          [[14, 27]]
1                        I live in the United States.          [[14, 27]]
2                        I live in the United States.          [[14, 27]]
3                        I live in the United States.          [[14, 27]]
4                        I live in the United States.          [[14, 27]]
5                        I live in the United States.          [[14, 27]]
6                        I live in the United States.          [[14, 27]]
7                        I live in the United States.          [[14, 27]]
8   France is where I live But I used to live in t...  [[0, 6], [49, 62]]
9   France is where I live But I used to live in t...  [[0, 6], [49, 62]]
10  France is where I live But I used to live in t...  [[0, 6], [49, 62]]
11  France is where I live But I used to live in t...  [[0, 6], [49, 62]]
12  France is where I live But I used to live in t...  [[0, 6], [49, 62]]
13  France is where I live But I used to live in t...  [[0, 6], [49, 62]]
14  France is where I live But I used to live in t...  [[0, 6], [49, 62]]
15  France is where I live But I used to live in t...  [[0, 6], [49, 62]]

最新更新