我想使用 mutliprocessing.pool 方法并行化计算。问题是我想在计算中使用的函数呈现了两个参数和可选的 kwarg,第一个参数是数据帧,第二个参数是 str,任何 kwargs 是字典。
我想使用的数据帧和字典对于我尝试执行的所有计算都是相同的,只是第二个参数不断变化。因此,我希望能够使用 map 方法将其作为不同字符串的列表传递给已经打包的带有 df 和 dict 的函数。
from utils import *
import multiprocessing
from functools import partial
def sumifs(df, result_col, **kwargs):
compare_cols = list(kwargs.keys())
operators = {}
for col in compare_cols:
if type(kwargs[col]) == tuple:
operators[col] = kwargs[col][0]
kwargs[col] = list(kwargs[col][1])
else:
operators[col] = operator.eq
kwargs[col] = list(kwargs[col])
result = []
cache = {}
# Go through each value
for i in range(len(kwargs[compare_cols[0]])):
compare_values = [kwargs[col][i] for col in compare_cols]
cache_key = ','.join([str(s) for s in compare_values])
if (cache_key in cache):
entry = cache[cache_key]
else:
df_copy = df.copy()
for compare_col, compare_value in zip(compare_cols, compare_values):
df_copy = df_copy.loc[operators[compare_col](df_copy[compare_col], compare_value)]
entry = df_copy[result_col].sum()
cache[cache_key] = entry
result.append(entry)
return pd.Series(result)
if __name__ == '__main__':
ca = read_in_table('Tab1')
total_consumer_ids = len(ca)
base = pd.DataFrame()
base['ID'] = range(1, total_consumer_ids + 1)
result_col= ['A', 'B', 'C']
keywords = {'Z': base['Consumer archetype ID']}
max_number_processes = multiprocessing.cpu_count()
with multiprocessing.Pool(processes=max_number_processes) as pool:
results = pool.map(partial(sumifs, a=ca, kwargs=keywords), result_col)
print(results)
但是,当我运行上面的代码时,出现以下错误:TypeError: sumifs() missing 1 required positional argument: 'result_col'
.我怎样才能为函数提供第一个 arg 和 kwarg,同时提供第二个参数作为 str 列表,以便我可以平行计算?我在论坛上读过几个类似的问题,但似乎没有一个解决方案适用于这种情况......
谢谢,如果有什么不清楚的地方,我今天才知道多处理包!
让我们看一下代码的两个部分。
首先是sumifs
函数声明:
def sumifs(df, result_col, **kwargs):
其次,使用相关参数调用此函数。
# Those are the params
ca = read_in_table('Tab1')
keywords = {'Z': base['Consumer archetype ID']}
# This is the function call
results = pool.map(partial(sumifs, a=ca, kwargs=keywords), tasks)
更新 1:
编辑原始代码后。看起来问题是位置参数赋值,请尝试丢弃它。
替换以下行:
results = pool.map(partial(sumifs, a=ca, kwargs=keywords), result_col)
跟:
results = pool.map(partial(sumifs, ca, **keywords), result_col)
示例代码:
import multiprocessing
from functools import partial
def test_func(arg1, arg2, **kwargs):
print(arg1)
print(arg2)
print(kwargs)
return arg2
if __name__ == '__main__':
list_of_args2 = [1, 2, 3]
just_a_dict = {'key1': 'Some value'}
with multiprocessing.Pool(processes=3) as pool:
results = pool.map(partial(test_func, 'This is arg1', **just_a_dict), list_of_args2)
print(results)
将输出:
This is arg1
1
{'key1': 'Some value'}
This is arg1
2
{'key1': 'Some value'}
This is arg1
2
{'key1': 'Some value'}
['1', '2', '3']
有关如何使用具有多个参数和 kwarg 的函数进行多处理池的更多示例
更新 2:
扩展示例(由于注释(:
但是,我想知道,以同样的方式,如果我的函数有三个 args 和 kwarg,并且我想保留 arg1、arg3 和 kwargs costant,我怎么能将 arg2 作为多处理的列表传递?从本质上讲,我将如何启动该映射的多处理(部分(test_func,'这将是arg1','这将是arg3',**just_a_dict(,arg2(部分中的第二个值对应于arg3而不是arg2?
更新 1代码将更改如下:
# The function signature
def test_func(arg1, arg2, arg3, **kwargs):
# The map call
pool.map(partial(test_func, 'This is arg1', arg3='This is arg3', **just_a_dict), list_of_args2)
这可以使用 python位置和关键字分配来完成。 请注意,kwargs
被搁置在一边,不使用关键字分配,尽管它位于关键字分配的值之后。
有关参数赋值差异的详细信息,请参阅此处。
如果有一段数据在所有工作/作业中都是恒定/固定的,那么最好在创建池期间使用此固定数据"初始化"池中的进程,并映射不同的数据。 这样可以避免在每个作业请求时重新发送固定数据。 在您的情况下,我会执行以下操作:
df = None
kw = {}
def initialize(df_in, kw_in):
global df, kw
df, kw = df_in, kw_in
def worker(data):
# computation involving df, kw, and data
...
...
with multiprocessing.Pool(max_number_processes, intializer, (base, keywords)) as pool:
pool.map(worker, varying_data)
此要点包含使用初始值设定项的完整示例。 这篇博客文章介绍了使用初始值设定项的性能提升。