在python多处理器中更改共享对象



假设有一个dummy.txt文件包含以下信息:

a_to_b_from_c 20
a_to_b_from_w 30
a_to_b_from_k 20
a_to_b_from_l 10
c_to_f_from_e 30
c_to_f_from_k 20
c_to_f_from_l 10

(数值仅为102030)和以下代码:

import multiprocessing 
global conns 
class Line():
def __init__(self, text, group) -> None:
self.text = text 
self.group = int(group)
def get_link(self):
return self.text.split('_from')[0]
def __repr__(self):
return f"<{self.group},{self.text}>"
class Groups():
def __init__(self, name ) -> None: 
self.name = name 
self.groups = {k:set() for k in [10,20,30]}
def add_to_dict(self,line : Line):

connection = line.get_link()
if connection not in self.groups.keys():
self.groups[connection] = set()

self.groups[connection].add(line.text)
def thread_f(item : Line):
# Update the dictionary of every Group object accordingly
global conns

key = item.get_link()

conns[key].add_to_dict(item)

def main():
global conns 
# Parse the file and store the information in an iterable
with open('dummy.txt') as f:
info = [ Line(*line.strip().split()) for line in f]
# Update the global (shared) object and initialize a dictionary 
# this has the following initialization: 
# { a_to_b : set(), c_to_f : set() }
conns = { k : Groups(k) for k in {x.get_link() for x in info} }
# Update the shared object according to the iterable information
with multiprocessing.Pool(5) as pool:
res = pool.map(thread_f,     # add to the appropriate key the items 
info,        # the lines 
chunksize=1) # respect the order
# Display the Results        
for group_name, group_obj in conns.items():
print(f"Grp Name {group_name} has the following:")
for cost, connections in group_obj.groups.items():
print(cost,connections)

if __name__ == "__main__":
main()

我要做的是首先解析文件,并为文件的每一行生成一个Line对象。解析完成后,我更新全局变量conns,我打算将其用作池中所有工作者的共享变量。然后,在thread_f函数中,我通过将相应的Line添加到适当的Group对象的字典字段来更新全局变量(字典)。

问题是,当我试图显示信息时,什么都没有显示。相反,我得到了一组空集:

Grp Name a_to_b has the following:
10 set()
20 set()
30 set()
Grp Name c_to_f has the following:
10 set()
20 set()
30 set()

相反,我期待的是:

Grp Name a_to_b has the following
10 set(a_to_b_from_l)
20 set(a_to_b_from_c,a_to_b_from_k)
30 set(a_to_b_from_w)
Grp Name c_to_f has the following:
10 set(c_to_f_from_l)
20 set(c_to_f_from_k)
30 set(c_to_f_from_e)

由于python多处理实际上是一种fork方法,我确实理解子进程确实可以访问父进程已经初始化的信息,但它们的更改对父进程没有影响。在阅读了文档并在S.O.中搜索后,我发现了有关多处理包的Manager对象的信息。问题是,我无法生成已经初始化的Manager.dict()(就像我在conns理解中所做的那样)。

我如何才能实现上述期望的行为?

是的,但为什么是多处理器

这个例子只是我创建的一个MWE,用来模仿我实际代码的功能。事实上,我正在尝试加速那些不能很好地扩展到真正大的输入文件的代码。

关于Manager

在这里有一个稍微类似的问题[1]的驱动下,我没有设法从预先存在的字典(即已经初始化的字典)中找到初始化或Manager.dict()的方法,以传递到派生的进程中。因此,我使用了sets,它保证不会有重复的条目,并且global已经初始化,将由进程持续更新。

拟议的Manager方法的变通方案

好吧,由于计算工作量随着共享资源(即Manager.dict对象)的使用而增加,潜在的解决方案可能如下吗?

想法是:

  • 避免在进程之间使用共享资源
  • 因此,避免比赛条件

因此,假设我们的进程工作池总共由X个进程组成。每个工作者的任务是将每个CCD_ 21分类为适当的CCD_。将CCD_ 23作为CCD_。因此,如果我们使用这样一种有点分而治之的方法:

+--------------------------------------------------------------+
|                        list[Line]                            |
+--------------------------------------------------------------+
|        |        |        |        |        |        |        |
|        |        |        |        |        |        |        |
<-d->    <-d->    <-d->     ...                       <-d->

该列表被划分为大小为CCD_ 26的CCD_。然后,将这些可迭代项中的每一个提供给工作者进行处理。但是现在thread_f函数必须相应地进行修改。

应:

  • 生成Groups的字典,其中key = line.group和value=Group对象
  • 根据他给定的块/片/子列表的Line对象填充此字典
  • 返回字典

procs的pool完成后,必须将结果(即字典)合并为一个具有最终解决方案的字典。

首先,我认为方法Groups.add_to_dict中有一个错误。我已经注释掉了错误的声明,并在其后面添加了正确的声明:

import multiprocessing
def init_processes(d, the_lock):
global conns, lock
conns, lock = d, the_lock
class Line():
def __init__(self, text, group) -> None:
self.text = text
self.group = int(group)
def get_link(self):
return self.text.split('_from')[0]
def __repr__(self):
return f"<{self.group},{self.text}>"
class Groups():
def __init__(self, name ) -> None:
self.name = name
self.groups = {k:set() for k in [10,20,30]}
def add_to_dict(self,line : Line):
#connection = line.get_link()
connection = line.group
if connection not in self.groups.keys():
self.groups[connection] = set()
self.groups[connection].add(line.text)

def thread_f(item : Line):
# Update the dictionary of every Group object accordingly
global conns # Not strictly necessary
key = item.get_link()
# We need to let the managed dict know there is an updated value for the key:
"""
conns[key].add_to_dict(item)
"""
with lock:
the_set = conns[key]
the_set.add_to_dict(item)
conns[key] = the_set # reset the reference
def main():
# Parse the file and store the information in an iterable
with open('dummy.txt') as f:
info = [ Line(*line.strip().split()) for line in f]
# Update the global (shared) object and initialize a dictionary
# this has the following initialization:
# { a_to_b : set(), c_to_f : set() }
conns = multiprocessing.Manager().dict(
{ k : Groups(k) for k in {x.get_link() for x in info} }
)
# Update the shared object according to the iterable information
lock = multiprocessing.Lock()
with multiprocessing.Pool(5, initializer=init_processes, initargs=(conns, lock)) as pool:
res = pool.map(thread_f,     # add to the appropriate key the items
info,        # the lines
chunksize=1) # respect the order
# Display the Results
for group_name, group_obj in conns.items():
print(f"Grp Name {group_name} has the following:")
for cost, connections in group_obj.groups.items():
print(cost,connections)

if __name__ == "__main__":
main()

打印:

Grp Name c_to_f has the following:
10 {'c_to_f_from_l'}
20 {'c_to_f_from_k'}
30 {'c_to_f_from_e'}
Grp Name a_to_b has the following:
10 {'a_to_b_from_l'}
20 {'a_to_b_from_k', 'a_to_b_from_c'}
30 {'a_to_b_from_w'}

更新

我可能在这里偏离了基础,但我看到的大部分工作似乎都在解析输入行。在您的实际情况中,无论这可能是什么,它可能代表您的总处理的一个微不足道的部分(如果不是,那么,正如我之前在评论中提到的,多处理不适合这个问题),但我认为没有理由不将该处理移到多处理池本身。

我对代码进行了极大的重构,将行解析移到了Line类,并且不再需要Groups类,因为字典和集合的合并是由主进程完成的。

我使用方法imap_unordered而不是imap,因为它通常效率略高,并且您以前的编码不使用map方法的返回值,这与生成结果的顺序无关。因此,关键字可以按任意顺序添加到字典中。为什么一开始就要把字典里的东西都按顺序排列呢?

您应该注意,如果您的输入文件有很多行,并且您的辅助函数需要非琐碎的处理,那么您可以以比进程清空更快的速度填充多处理任务队列,并且可能会耗尽内存。我确实有一个解决方案,但那是另一回事。

import multiprocessing

class ProcessedLine():
def __init__(self, text : str) -> None:
self.text, group = text.strip().split()
self.group = int(group)
self.link = text.split('_from')[0]
self.dict = {self.link: {self.group: set([self.text])}}
def process_line(text : str):
processed_line = ProcessedLine(text)
return processed_line
def compute_chunksize(iterable_size, pool_size):
chunksize, remainder = divmod(iterable_size, 4 * pool_size)
if remainder:
chunksize += 1
return chunksize
def main():
def generate_lines():
with open('dummy.txt') as f:
for line in f:
yield line
ESTIMATED_NUMBER_OF_LINES_IN_FILE = 7
POOL_SIZE = min(ESTIMATED_NUMBER_OF_LINES_IN_FILE, multiprocessing.cpu_count())
# chunksize to be used with imap_unordered:
chunksize = compute_chunksize(ESTIMATED_NUMBER_OF_LINES_IN_FILE, POOL_SIZE)
pool = multiprocessing.Pool(POOL_SIZE)
# Specify a chunksize value if the size of the iterable is large
results = {}
for processed_line in pool.imap_unordered(process_line, generate_lines(), chunksize=chunksize):
link = processed_line.link
if link not in results:
# Just update with the entire dictionary
results.update(processed_line.dict)
else:
# Update the set dictionary:
set_dict = results[link]
set_key = processed_line.group
if set_key in set_dict:
set_dict[set_key].add(processed_line.text)
else:
#set_dict[set_key] = set(processed_line.text)
set_dict[set_key] = processed_line.dict[link][set_key]
pool.close()
pool.join()
for group_name, groups in results.items():
print(f'Group Name {group_name} has the following:')
for k, v in groups.items():
print('   ', k, '->', v)
print()
if __name__ == "__main__":
main()

打印:

Group Name a_to_b has the following:
20 -> {'a_to_b_from_c', 'a_to_b_from_k'}
30 -> {'a_to_b_from_w'}
10 -> {'a_to_b_from_l'}
Group Name c_to_f has the following:
30 -> {'c_to_f_from_e'}
20 -> {'c_to_f_from_k'}
10 -> {'c_to_f_from_l'}

最新更新