我正在尝试同时运行Bert模型的多个副本。
我有一个包含池的 python 对象:
self.tokenizer = BertTokenizer.from_pretrained(BERT_LARGE)
self.model = BertForQuestionAnswering.from_pretrained(BERT_LARGE)
self.pool = Pool(processes=max_processes,
initializer=pool_init,
initargs=(self.model, self.tokenizer))
池中的每个进程都跨 Bert 分词器和模型复制:
process_model = None
process_tokenizer = None
def pool_init(m: BertForQuestionAnswering, t: BertTokenizer):
global process_model, process_tokenizer
process_model, process_tokenizer = m, t
要使用池,我然后运行
while condition:
answers = self.pool.map(answer_func, questions)
condition = check_condition(answers)
这种设计是为了避免每次初始化池时将 Bert 模型重新加载到每个进程中的巨大开销(每个进程大约需要 1.5-2 秒)。
问题1.这是最好的方法吗?
问题2.如果是这样,我什么时候应该打电话给self.pool.close()
和self.pool.join()
?我想在check_condition()
函数之前join()
,但我真的不想close()
池(除非直到对象的__del__()
),但是在调用close()
之前调用join()
会给我错误,并且调用close()
会使池将来无法调用。池不是为这类工作而设计的,我应该管理一系列流程吗?帮助。。。?
谢谢!!
你说,"这种设计是为了避免每次初始化池时将 Bert 模型重新加载到每个进程中的巨大开销(每个进程大约需要 1.5-2 秒)。 您的陈述和您展示的少量代码对我来说并不完全有意义。我认为这是一个术语问题。
首先,我没有看到池在哪里多次初始化;我只看到一个创建池的实例:
self.pool = Pool(processes=max_processes,
initializer=pool_init,
initargs=(self.model, self.tokenizer))
但是,如果您多次创建池,那么您当前的设计实际上是使用pool_init
函数在每次创建池时将 Bert 模型重新加载到池的每个进程中,而不是避免您所说的要避免的内容。但这可能是一件好事。所以我怀疑我们在谈论两件不同的事情。所以我只能解释一下到底发生了什么:
由于循环while condition:
,您可能会多次调用pool.map
函数。但是,通常,如果可以避免多次创建池,则确实希望避免这样做。现在,我可以想到两个原因来对Pool
构造函数使用初始值设定项和initargs参数:
- 如果您有工作线程函数(在您的例子中
answer_func
)需要访问的只读数据项,而不是在每次调用该函数时传递这些项,通常使用这些数据项初始化池中每个进程的全局变量并让工作线程函数仅访问全局变量会更便宜。 - 某些数据类型(例如
multiprocessing.Lock
实例)不能使用任何multiprocessing.Pool
方法作为参数传递,并且需要使用池初始化函数"传递"。
案例2似乎不适用。因此,如果您有 100 个问题且池大小为 8,则最好通过模型和分词器8次,池中的每个进程一次,而不是100次,每个问题一次。
如果使用方法Pool.map
,则在所有提交任务完成之前会阻止,则可以确保当该方法返回时,池中没有进程运行任何任务。如果要重新执行池创建代码,则在终止while condition:
循环时,应通过调用pool.close()
后跟pool.join()
来释放资源,这将等待池中的进程终止,或者您可以调用pool.terminate()
,这会立即终止所有池进程(我们知道此时处于空闲状态)。如果您只创建池一次,则实际上不必调用任何内容;池中的进程是守护进程,当主进程终止时,守护进程将终止。但是,如果在不再需要池后将执行进一步处理,则为了尽早释放资源,则应执行前面描述的"清理"。
这有意义吗?
附加说明
由于pool.map
块直到所有提交任务完成,因此无需调用pool.join()
以确保任务已完成;pool.map
将返回 worker 函数返回的所有返回值的列表。answer_func
.
除了释放我已经提到的资源之外,pool.join()
有用的地方是当您发出一个或多个pool.apply_async
方法调用时。此方法是非阻塞的,并返回一个AsyncResult
实例,稍后可以在该实例上发出get
调用以阻止以完成任务并获取返回值。但是,如果您对返回值不感兴趣,只需要等待任务完成,那么只要您不需要向池提交更多任务,您只需发出一个pool.close()
,然后发出一个pool.join()
,在完成这两个调用时,您可以确定所有提交的任务都已完成(可能有例外)。
因此,在类的__del__
方法中调用pool.terminate()
对于一般用法来说是一个好主意。