在进行多处理器处理时,我很难修改数据帧。这是我的代码的简化版本:
data_attrib.loc[:, 'Id'] = ['' for _ in range(len(data_attrib))]
def myfunction(i):
data_attrib.at[i, 'Id'] = i
print(data_attrib.at[i, 'Id'])
import multiprocessing
processes = []
for i in data_attrib.index:
#launch multiprocessing
pro = multiprocessing.Process(target = myfunction, args = [i])
pro.start()
processes.append(pro)
for process in processes:
process.join()
如果不进行多重处理,该函数将完美工作。然而,对于多处理,它正确地执行函数,但列data_attrib["Id"]仍然为空。
有人能告诉我为什么以及如何修理它吗。谢谢
您有几个问题。首先,当您进行多处理时,启动进程的代码必须位于由if __name__ == '__main__':
测试管理的块内。否则,当启动子进程时,进程启动代码将被重新执行,并且子进程将在无限递归循环中启动更多的子进程。
其次,与线程不同,每个进程都在自己的内存空间中运行,因此它修改的任何全局变量都不会反映在其他进程中。例如,这里尝试通过将列表作为参数传递来纠正这两个问题(尽管只是使用简单的列表而不是数据帧来演示(:
import multiprocessing
def myfunction(l, i):
l[i] += 1
def main():
l = [i for i in range(3)]
processes = []
for i in range(3):
#launch multiprocessing
pro = multiprocessing.Process(target = myfunction, args = (l, i))
pro.start()
processes.append(pro)
for process in processes:
process.join()
print(l)
if __name__ == '__main__':
main()
打印:
[0, 1, 2]
原始列表未被修改,因为传递给子进程的是列表的副本。相反,必须修改程序,使子进程返回修改后的值,并由主进程本身进行列表的实际更新。但是,使用Process
获取返回值并不是那么简单。为了获得返回值,最好使用来自multiprocessing.pool
模块的Pool
类或来自concurrent.futures
模块的ProcessPoolExecutor
类。
但是,通过允许多个进程通过代理对对象的单个副本进行操作,还有另一种方法可以在不传递返回值的情况下完成您需要做的事情。这是由multiprocessing
模块中的Manager
类提供的。
import multiprocessing
def myfunction(l, i):
l[i] += 1
def main():
with multiprocessing.Manager() as manager:
l = manager.list()
for i in range(3):
l.append(i)
processes = []
for i in range(3):
#launch multiprocessing
pro = multiprocessing.Process(target = myfunction, args = (l, i))
pro.start()
processes.append(pro)
for process in processes:
process.join()
print(l)
if __name__ == '__main__':
main()
打印:
[1, 2, 3]
你显然需要阅读Manager
课程,看看它如何适应你的特定问题。Manager
类为您提供了选择,但不是无限的数字。如果你想继续使用数据帧,你可能需要发挥创意。
以下代码不使用Manager
类,每个子进程都在处理自己的输入数据副本。相反,每个进程将其修改后的值返回给主进程,由主进程重新组装最终结果:
from concurrent.futures import ProcessPoolExecutor
l = [i for i in range(3)]
def myfunction(i):
return l[i] + 1
def main():
with ProcessPoolExecutor(max_workers=3) as executor:
results = executor.map(myfunction, range(3))
for i, result in enumerate(results):
l[i] = result
print(l)
if __name__ == '__main__':
main()