我正在尝试使用Python的多处理映射功能。我已经将地图调用放置在子功能中,因为我需要循环循环较大的数据集以将其划分并在较小的块上调用地图。
我的问题是Time.Sleep(5)线被多次称为"测试!"正在打印5次(在开始时似乎相等一次,然后在循环数量 *过程数量的数量上进行2 * 2),即使它的水平高于多处理调用。不过,与此同时,我期望的是CSV输出,因此RunParallel()按预期运行并称为预期次数。
from multiprocessing import Pool
import numpy as np
import os,csv,copy,time
from AuxFuncs import *
def master():
time.sleep(5)
print('Test!')
for mult in [1,10]:
runParallel(mult)
def runParallel(mult):
randIntInputs = list()
for i in range(5): randIntInputs.append((np.random.randint(10)*mult,mult))
if __name__=='__main__':
p = Pool(processes=2)
results = p.map(testFunc,randIntInputs)
p.close()
p.join()
valsToSave = [list(result[0]) for result in results]
write2dListToCSV(valsToSave,'output' + str(mult) + '.csv')
def testFunc(inputs):
return np.random.randint(1,10,5) * inputs[0],inputs[1]
master()
,输出为:
Test!
Test!
Test!
Test!
Test!
我认为问题可能是我将池调用放入一个函数中,但是即使我将其移出功能,我也有相同的问题("测试!"是通过以下代码打印3次的3次。)
from multiprocessing import Pool
import numpy as np
import os,csv,copy,time
from AuxFuncs import *
def testFunc(inputs):
return np.random.randint(1,10,5) * inputs[0],inputs[1]
print('Test!')
mult,randIntInputs = 5,list()
for i in range(5): randIntInputs.append((np.random.randint(10)*mult,mult))
if __name__=='__main__':
p = Pool(processes=2)
results = p.map(testFunc,randIntInputs)
p.close()
p.join()
valsToSave = [list(result[0]) for result in results]
write2dListToCSV(valsToSave,'output' + str(mult) + '.csv')
编辑:谢谢您的帮助。看起来这是有效的:
来自多处理导入池导入numpy作为NP导入OS,CSV,复制,时间从auxfuncs导入 *
def master():
if __name__=='__main__':
time.sleep(5)
print('Test!')
for mult in [1,10]:
runParallel(mult)
def runParallel(mult):
randIntInputs = list()
for i in range(5): randIntInputs.append((np.random.randint(10)*mult,mult))
# if __name__=='__main__':
p = Pool(processes=2)
results = p.map(testFunc,randIntInputs)
p.close()
p.join()
valsToSave = [list(result[0]) for result in results]
write2dListToCSV(valsToSave,'output' + str(mult) + '.csv')
def testFunc(inputs):
return np.random.randint(1,10,5) * inputs[0],inputs[1]
master()
这里可能发生的事情是每个过程都试图导入您调用的函数。发生这种情况时,它会运行任何在定义之外称为或不被if
屏蔽的功能,包括您对master
的调用。将if __name__ ...
放入定义中,将您不使用它来屏蔽其他操作。我认为您要寻找的是这样的:
def master():
time.sleep(5)
print('Test!')
for mult in range(1, 11):
runParallel(mult)
def runParallel(mult):
randIntInputs = list()
for i in range(5): randIntInputs.append((np.random.randint(10)*mult,mult))
with Pool(processes=2) as p:
results = p.map(testFunc,randIntInputs)
valsToSave = [list(result[0]) for result in results]
write2dListToCSV(valsToSave,'output' + str(mult) + '.csv')
def testFunc(inputs):
return np.random.randint(1,10,5) * inputs[0],inputs[1]
if __name__ == '__main__':
master()
此和您的最后一个更新之间的区别是,在您的更新主题中,每个过程仍调用Master,它只是没有做任何事情,因为if
语句未评估True
;但是在此代码中,它仅调用master
一次,然后每次被if
语句阻止。差异并不大,但此版本更实用。
顺便说一句,我利用with
语句自由地将您的池放在上下文管理器中。一旦上下文退出,这将自动关闭Pool
。我还删除了.join()
,因为Pool().map()
功能已经暂停了主线程,直到它返回为止。最后,我将您在Master中创建的临时列表更改为呼叫range
。range
用于在输入的两个输入之间创建一个数字序列,包括左侧,但不是向右。使用单个参数,它将使用0作为起点,然后升至指定的数字;range(10) => 0 1 2 3 4 5 6 7 8 9
。