所以我既看了多处理模块的文档,也看了这里提出的其他问题,似乎没有一个与我的情况相似,因此我开始了一个新问题。
为简单起见,我有一段形式的代码:
# simple dataframe of some users and their properties.
data = {'userId': [1, 2, 3, 4],
'property': [12, 11, 13, 43]}
df = pd.DataFrame.from_dict(data)
# a function that generates permutations of the above users, in the form of a list of lists
# such as [[1,2,3,4], [2,1,3,4], [2,3,4,1], [2,4,1,3]]
user_perm = generate_permutations(nr_perm=4)
# a function that computes some relation between users
def comp_rel(df, permutation, user_dict):
df1 = df.userId.isin(permutation[0])
df2 = df.userId.isin(permutation[1])
user_dict[permutation[0]] += permutation[1]
return user_dict
# and finally a loop:
user_dict = defaultdict(int)
for permutation in user_perm:
user_dict = comp_rel(df, permutation, user_dict)
我知道这段代码现在几乎没有意义(如果有的话(,但我只是写了一个接近我正在处理的实际代码结构的小示例。该user_dict
最终应该包含userIds
和一些价值。
我有实际的代码,它工作正常,给出了正确的字典和一切,但是......它在单个线程上运行。而且它非常慢,请记住,我还有另外 15 个线程完全免费。
我的问题是,如何使用 python 的multiprocessing
模块来更改最后一个 for 循环,并能够在所有可用的线程/内核上运行?我看了文档,不是很容易理解。
编辑:我正在尝试使用游泳池作为:
p = multiprocessing.Pool(multiprocessing.cpu_count())
p.map(comp_rel(df, permutation, user_dict), user_perm)
p.close()
p.join()
但是这中断了,因为我正在使用以下行:
user_dict = comp_rel(df, permutation, user_dict)
在初始代码中,我不知道池完成后应该如何合并这些字典。
在评论中进行了简短的讨论后,我决定使用以下ProcessPoolExecutor
发布解决方案:
import concurrent.futures
from collections import defaultdict
def comp_rel(df, perm):
...
return perm[0], perm[1]
user_dict = defaultdict(int)
with concurrent.futures.ProcessPoolExecutor() as executor:
futures = {executor.submit(comp_rel, df, perm): perm for perm in user_perm}
for future in concurrent.futures.as_completed(futures):
try:
k, v = future.result()
except Exception as e:
print(f"{futures[future]} throws {e}")
else:
user_dict[k] += v
它的工作原理与@tzaman相同,但它为您提供了处理异常的可能性。此外,此模块中还有更多有趣的功能,请查看文档。
您的comp_rel
有两个部分需要分开 - 首先是计算本身,它正在为某些用户ID计算一些值。第二个是"累积"步骤,它将该值添加到user_dict
结果中。
您可以分离计算本身,以便它返回一个(id, value)
元组并使用多处理将其进行农场化,然后将结果累积到主线程中:
from multiprocessing import Pool
from functools import partial
from collections import defaultdict
# We make this a pure function that just returns a result instead of mutating anything
def comp_rel(df, perm):
...
return perm[0], perm[1]
comp_with_df = partial(comp_rel, df) # df is always the same, so factor it out
with Pool(None) as pool: # Pool(None) uses cpu_count automatically
results = pool.map(comp_with_df, user_perm)
# Now add up the results at the end:
user_dict = defaultdict(int)
for k, v in results:
user_dict[k] += v
或者,您也可以将Manager().dict()
对象直接传递到处理函数中,但这有点复杂,可能不会为您提供任何额外的速度。
根据@Masklinn的建议,这里有一个稍微好一点的方法来避免内存开销:
user_dict = defaultdict(int)
with Pool(None) as pool:
for k, v in pool.imap_unordered(comp_with_df, user_perm):
user_dict[k] += v
这样,我们在结果完成时将它们相加,而不必先将它们全部存储在中间列表中。