我有一个类Processor
,它接收一些输入数据(我们将称之为示例(,处理输入数据并输出结果。在高水平上,它看起来像这样:
from tqdm import tqdm
import multiprocessing
class Processor:
def __init__(self, arg1, arg2, model_path):
self.arg1 = arg1
self.arg2 = arg2
# load model from very large file that will take some time
self.model = load_model_from_path(model_path)
def process_all_examples(self, all_examples):
all_results = []
pool = multiprocessing.Pool(4)
for result in tqdm(pool.imap_unordered(self.process_single_example, all_examples), total=len(all_examples)):
all_results.append(result)
return all_results
def process_single_example(self, example):
# do some complicated calculations on the example that use
# self.arg1, self.arg2, and self.model
return result
其思想是处理器初始化一次(加载模型需要很长时间(,并且可以利用多核机器来处理输入示例。上面的方法不起作用,因为类方法对于多处理是不可pickle的。在咨询了以下StackOverflow帖子后:
类方法Python 中的调用多处理
多处理:如何在类中定义的函数上使用Pool.map?
多处理:如何在类中定义的函数上使用Pool.map?
我想出了以下解决方案:
from tqdm import tqdm
import multiprocessing
class Processor:
def __init__(self, arg1, arg2, model_path):
self.arg1 = arg1
self.arg2 = arg2
# load model from very large file that will take some time
self.model = load_model_from_path(model_path)
def process_all_examples(self, all_examples):
all_results = []
all_inputs = [(self, example) for example in all_examples]
pool = multiprocessing.Pool(4)
for result in tqdm(pool.imap_unordered(self.process_single_example, all_inputs), total=len(all_inputs)):
all_results.append(result)
return all_results
@staticmethod
def process_single_example(inputs):
self, example = inputs
# do some complicated calculations on the example that use
# self.arg1, self.arg2, and self.model
return result
然而,这并没有奏效。如果我尝试运行process_all_examples
,它将卡在.imap_unordered
。出于测试目的,我尝试使用一些伪数据/处理来了解发生了什么,但多处理并没有陷入困境,而是非常慢:
from tqdm import tqdm
import multiprocessing
class Processor:
def __init__(self, arg1, arg2):
self.arg1 = arg1
self.arg2 = arg2
# load model from very large file that will take some time
self.model = [i for i in range(1000)]
def process_all_examples_multi(self, all_examples, nproc=4):
all_results = []
all_inputs = [(self, example) for example in all_examples]
pool = multiprocessing.Pool(nproc)
for result in tqdm(pool.imap_unordered(self.process_single_example, all_inputs), total=len(all_inputs)):
all_results.append(result)
return all_results
def process_all_examples_single(self, all_examples):
all_results = []
all_inputs = [(self, example) for example in all_examples]
for _input in tqdm(all_inputs):
all_results.append(self.process_single_example(_input))
return all_results
@staticmethod
def process_single_example(inputs):
self, example = inputs
result = self.arg1 * self.arg2 * self.model[3] * example
return result
processor = Processor(-1, 2)
all_examples = list(range(100000))
results = processor.process_all_examples_multi(all_examples) # slower
results = processor.process_all_examples_single(all_examples) # faster
在.imap_unordered
中添加一个chunksize
参数(值在100到10000之间(似乎可以显著提高性能,但它永远不会超过只使用一个没有multiprocessin.Pool
的单核。
我知道还有其他选择,一个是重新设计我的代码结构,另一个是使用全局变量,但我无法摆脱我只是缺少了一些东西的感觉。我还尝试过使用pathos
库中的pathos.multiprocessing
模块,但没有成功。
使用多处理,您必须担心从父级传递到子级的有效负载与所完成的工作之间的关系。由于您使用的是分叉操作系统,因此在创建池时,父级和子级共享相同的内存。但您并没有真正利用这一点,因为您将self
及其数据(您的模型(传递给子级,以便对每个工作项进行处理。
您可以设置一些工作人员知道的全局状态,并将数据放在那里。任何大的东西都处于全局状态,池中唯一传递的是该工作者当前数据的索引。添加chunksize
可以减少通信开销,因此当您有很多工作项,并且它们都需要相对相似的计算时间时,添加它是很好的。
多处理有开销——如果单个工人的计算量很小,那就不值得了。在这个例子中,我用一个额外的for循环来提升所做的工作,假设您的实际工作相当大。但如果不是这样,游泳池真的没有帮助。
from tqdm import tqdm
import multiprocessing
import threading
# will hold (Processor, example set) for process_all_examples_multi
_process_this = None
_process_this_lock = threading.Lock()
class Processor:
def __init__(self, arg1, arg2):
self.arg1 = arg1
self.arg2 = arg2
# load model from very large file that will take some time
self.model = [i for i in range(1000)]
def process_all_examples_multi(self, all_examples, nproc=4):
# setup memory state for processing pool
with _process_this_lock:
global _process_this
_process_this = (self, all_examples)
# context manager deletes pool when done
with multiprocessing.Pool(nproc) as pool:
all_results = list(tqdm(pool.imap_unordered(
self.process_single_example_2,range(len(all_examples)), chunksize=100),
total=len(all_examples)))
return all_results
def process_all_examples_single(self, all_examples):
all_results = []
all_inputs = [(self, example) for example in all_examples]
for _input in tqdm(all_inputs):
all_results.append(self.process_single_example(_input))
return all_results
@staticmethod
def process_single_example(inputs):
self, example = inputs
result = self.arg1 * self.arg2 * self.model[3] * example
# lets simulate more work
for i in range(10000):
pass
return result
@staticmethod
def process_single_example_2(example_index):
processor, example = _process_this
result = processor.arg1 * processor.arg2 * processor.model[3] * example[example_index]
# lets simulate more work
for i in range(10000):
pass
return result
processor = Processor(-1, 2)
all_examples = list(range(100000))
results = processor.process_all_examples_multi(all_examples)
# vs
results = processor.process_all_examples_single(all_examples)