在python中进行多处理时共享(类)实例参数的任何方法



我做了一个简单的例子来描述我面临的问题。

代码示例1:

from multiprocessing import Pool
from functools import partial
metric # a class instance which includes several pandas dataframe as class variable. This instance is defined globally

variables # a list of size 2000k
def get_reward(variable_, metric_, *args):
...
reward = metric_.some_method(variable_)
return reward
func_ = partial(get_reward, metric_=metric)
mp_workers = Pool(64)
map_ = mp_workers.map(func_, variables)

代码示例2:

from multiprocessing import Pool
from functools import partial
metric # a class instance which includes several pandas dataframe as class variable. This instance is defined globally

variables # a list of size 2000k
def get_reward(variable_, *args):
...
reward = metric.some_method(variable_)
return reward

mp_workers = Pool(64)
map_ = mp_workers.map(get_reward, variables)

简而言之,这两个代码示例之间的区别在于:第二个代码中的"get_reward"函数不将类实例(metric(作为参数,而是将其用作全局实例,而第一个代码的"get_reward"函数将类实例作为参数。

我发现第一个代码工作非常缓慢,而第二个代码工作得很好。原因可能是第一段代码将大类实例复制到每个工作者身上,这需要相当长的时间。

在我的实际代码中,我只能拥有与第一个代码示例类似的结构,其中"get_reward"函数应该将类实例作为参数。

有什么方法可以加速代码示例1吗?

谢谢。

每个Python进程都生活在自己的地址空间中。它不能直接与另一个进程共享数据;相反,必须使用管道和队列将字节从一个进程传输到另一个进程。没有办法直接";份额;对象,例如数据帧。标准库函数使用Python pickle机制将对象编码为字节流,将其发送到另一个进程,并将其重新构造为另一端的对象。

但是,在metric对象的情况下,您编写的示例将pickle/transfer/unpickle它2000k次,variables中的每个项目一次。这就是map函数的作用。显然,这是非常低效的。

改进这一点的一种方法是初始化每个进程一次,并使用其自己的数据帧副本。单个数据项将放置在队列中,并在处理完上一个数据项后立即由进程删除。不进行额外的酸洗。你失去了地图功能的便利性,但这就是生活。

最好只创建与CPU核心数量一样多的进程,因为没有什么比让所有核心都忙更好的了。

这里有一个愚蠢的小程序,用来创建两个进程,用数据对象和队列初始化它们,然后开始执行。时间戳显示两个进程实际上是并行运行的。

from multiprocessing import Process, SimpleQueue
import time
from datetime import datetime
class Process1(Process):
def __init__(self, queue, an_obj):
self.queue = queue
self.an_obj = an_obj
super().__init__()
print("Process1", an_obj, self.pid)

def run(self):
while not self.queue.empty():
print(self.queue.get(), datetime.now(), self.an_obj)
time.sleep(1.0)

def main():
q = SimpleQueue()
p1 = Process1(q, "I wish I were a dataframe")
p2 = Process1(q, "I wish I were a dataframe too")
for n in range(20):
q.put(n)
p1.start()
p2.start()
p1.join()
p2.join()

输出:

Process1 I wish I were a dataframe None
Process1 I wish I were a dataframe too None
0 2022-09-07 18:52:14.988650 I wish I were a dataframe
1 2022-09-07 18:52:15.004269 I wish I were a dataframe too
2 2022-09-07 18:52:15.990152 I wish I were a dataframe
3 2022-09-07 18:52:16.005771 I wish I were a dataframe too
4 2022-09-07 18:52:17.005312 I wish I were a dataframe
5 2022-09-07 18:52:17.020999 I wish I were a dataframe too
6 2022-09-07 18:52:18.005842 I wish I were a dataframe
7 2022-09-07 18:52:18.021450 I wish I were a dataframe too
8 2022-09-07 18:52:19.006774 I wish I were a dataframe
9 2022-09-07 18:52:19.022367 I wish I were a dataframe too
10 2022-09-07 18:52:20.007715 I wish I were a dataframe
11 2022-09-07 18:52:20.023279 I wish I were a dataframe too
12 2022-09-07 18:52:21.022944 I wish I were a dataframe
13 2022-09-07 18:52:21.038544 I wish I were a dataframe too
14 2022-09-07 18:52:22.023039 I wish I were a dataframe
15 2022-09-07 18:52:22.038665 I wish I were a dataframe too
16 2022-09-07 18:52:23.024069 I wish I were a dataframe
17 2022-09-07 18:52:23.039694 I wish I were a dataframe too
18 2022-09-07 18:52:24.025000 I wish I were a dataframe
19 2022-09-07 18:52:24.040682 I wish I were a dataframe too

相关内容

  • 没有找到相关文章

最新更新