为什么ProcessPoolExecutor一直在运行



我尝试使用pythonProcessPoolExecutor来计算一些FFT并行,请参阅以下代码:

import concurrent.futures
import numpy as np
from scipy.fft import fft
def fuc(sig):
C = fft(sig,axis=-1) 
return C
def main()
P, M, K = 20, 30, 1024
FKP = np.array([P,M,K],dtype='cdouble')
fkp = np.array([P,M,K],dtype='float32')
fkp = np.random.rand(P,M,K)
with concurrent.futures.ProcessPoolExecutor(max_workers=4) as ex:
results = ex.map(fuc,(fkp[p,m].reshape(1,K) for p in range(P) for m in range(M)))
FKP = list(results)
if __name__ == '__main__':
main()

问题:

  1. 为什么内核一直很忙,但我没有看到来自windows任务管理器的4个工人
  2. 我是否使用正确的方式来获得行"中的并行计算结果;FKP=列表(结果(">

Q1:
"为什么内核一直很忙,但我没有看到来自windows任务管理器的4个工人">

A1:
让我们在代码本身中解决这个问题:

import os
import time
...
def fuc( sig ):
print( ( "INF[{0:}]: fuc() starts   "
+ "running in process[{1:}]"
+ "-called-from-process[{2:}]"
).format( time.get_perf_ns(), os.getpid(), os.getppid() )
)
C = fft( sig, axis = -1 )
print( ( "INF[{0:}]: fuc() FFT done "
+ "running in process[{1:}]"
+ "-called-from-process[{2:}]"
).format( time.get_perf_ns(), os.getpid(), os.getppid() )
)
return C

该代码将自行记录计划中FFT部分的实际计算时间、内容和时间。


Q2:
"我是否使用正确的方式来获得行"中的并行计算结果;FKP=列表(结果(">

A2:
是的,但每个SER/COMMS/DES进程到处理边界跨越都有一组显著的附加开销成本,其中所有数据都经过SER/DES编码(pickle.dumps()-类似于[TIME]-+[SPACE]-域中的CPU/RAM成本+非零ipc-p2p-传输时间(:

def Pinf():
print( ( "NEW[{0:}]: ProcessPoolExecutor process-pool has "
+ "started process[{1:}]"
+ "-called-from-process[{2:}]"
).format( time.get_perf_ns(), os.getpid(), os.getppid() )
)
def main():
...
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
print( ( "INF[{0:}]: context-manager"
+ 30*"_" + " entry point"
).format( time.get_perf_ns()
)
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
with concurrent.futures.ProcessPoolExecutor( max_workers = 4,
initializer = Pinf
) as ex:
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
print( ( "INF[{0:}]: context-manager"
+ " is to start .map()"
).format( time.get_perf_ns()
)
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
results = ex.map( fuc,
( fkp[p,m].reshape( 1, K )
for p   in range( P )
for   m in range( M )
)
)
...
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
print( ( "INF[{0:}]: context-manager"
+ " .map() returned / __main__ has received all <_results_>"
).format( time.get_perf_ns()
)
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
pass
# - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
print( ( "INF[{0:}]: context-manager"
+ 30*"_" + " exited"
).format( time.get_perf_ns()
)
...
print( type( results ) )
...

有关每个进程池进程实例化的实际附加成本,请参阅报告的ns跟踪。详细信息是特定于平台的,因为{MacOS|Linux|Windows}-生成新进程的方法差异很大。这对Python版本也是有效的,因为与Py2和早期版本的Py3.x相比,最近的Py3版本在调用Python解释器过程复制方面做得很好,有些版本复制了调用Python解释器的整个有状态副本,以及数据、文件描述符等的完整副本,从而承担了更大的过程实例化成本,由于用于存储调用Python解释器的n个副本的所有相关联的RAM分配。

给定缩放比例:

>>> len( [ ( p, m ) for p in range( P ) for m in range( M ) ] )
600

效率很重要。仅将索引为子范围的(p_start,p_end,m_start,m_end(的一个元组传递到4个过程,其中将进行信号段的FFT处理并返回其FFT结果的子列表,将避免传递相同的元组,静态数据多次分为小块,完全避免596x通过(CPU-RAM-和延迟(昂贵的SER/COMMS/DES-SED/COMMS/DES ipc-p2p数据通过通道。

有关更多详细信息,您可能希望重新阅读此和此。

相关内容

  • 没有找到相关文章

最新更新