我写了一个简单的并行python程序
import multiprocessing as mp
import time
def test_function(i):
print("function starts" + str(i))
time.sleep(1)
print("function ends" + str(i))
if __name__ == '__main__':
pool = mp.Pool(mp.cpu_count())
pool.map(test_function, [i for i in range(4)])
pool.close()
pool.join()
我希望在输出中看到的内容:
function starts0
function starts2
function starts1
function starts3
function ends1
function ends3
function ends2
function ends0
我实际看到的:
function starts1
function ends1
function starts3
function ends3
function starts2
function ends2
function starts0
function ends0
当我查看输出时,它看起来像pool.map
运行一个函数并等待它完成然后运行另一个函数,但是当我计算整个程序的持续时间时,它大约是 2 秒,除非test_function
并行运行,否则这是不可能的
编辑:
此代码在MacOS和Linux中运行良好,但在Windows 10上未显示预期的输出。 Python 版本是 3.6.4
multiprocessing.Pool()
文档(从此,包括 Py27)清楚地表明,有意阻止处理由迭代器生成的仅 -4- 调用集创建的调用队列,该队列是从上面发布的示例按顺序生成的
。multiprocessing
-module 文档对其Pool.map()
方法进行了说明:
map(func, iterable[, chunksize])
map()
内置函数的并行等效项(虽然它只支持一个可迭代参数)。它会阻止,直到结果准备就绪。
这应该是观察到的行为,而不同的实例化方法会产生不同的附加(进程复制相关)开销成本。
无论如何,mp.cpu_count()
不一定是任何此类调度的 CPU 内核数.Pool()
-实例工作线程的任务将继续执行,因为 O/S(与用户/进程相关的限制策略)设置的亲和力:
您的代码必须"服从"这些 CPU 内核的子集,这些内核允许由任何此类multiprocessing
请求的子进程利用,
其数量不高于:len( os.sched_getaffinity( 0 ) )
最佳下一步:重新评估整个代码执行生态系统
import multiprocessing as mp # file: ~/p/StackOverflow_multiprocessing.py
import time, os, platform, inspect # https://stackoverflow.com/questions/58738716/python-multiprocessing-pool-map-doesnt-work-parallel/58755642
def test_function( i = -1 ):
pass; thisframerecord = inspect.stack()[0] # 0 represents this line
pass; callerframerecord = inspect.stack()[1] # 1 represents line at caller
pass; _INFO_ = inspect.getframeinfo( thisframerecord[0] )
pass; _CALLER_ = inspect.getframeinfo( callerframerecord[0] )
print( "{0:_>30.10f} ::".format( time.monotonic() ),
"PID:{0:} with PPID:{1:} runs".format( os.getpid(), os.getppid() ),
"{0:}( i = {2:} )-LINE[{1:_>4d}],".format( _INFO_.function, _INFO_.lineno, i ),
"invoked from {0:}()-LINE[{1:_>4d}]".format( _CALLER_.function, _CALLER_.lineno )
)
time.sleep( 10 )
pass; thisframerecord = inspect.stack()[0] # 0 represents this line
pass; _INFO_ = inspect.getframeinfo( thisframerecord[0] ) # 1 represents line at caller
print( "{0:_>30.10f} ::".format( time.monotonic() ),
"PID:{0:} with PPID:{1:} ends".format( os.getpid(), os.getppid() ),
"{0:}( i = {2:} )-LINE[{1:_>4d}],".format( _INFO_.function, _INFO_.lineno, i )
)
if __name__ == '__main__':
print( "{0:} cores reported by {1:}".format( mp.cpu_count(), "mp.cpu_count()" ) )
print( "{0:} cores permit'd by {1:}".format( os.sched_getaffinity(0), "os.sched_getaffinity(0)" ) )
print( "O/S sub-process instantiation methods {0:} available".format( mp.get_all_start_methods() ) )
print( "O/S will use this instantiation method {0:}".format( mp.get_start_method() ) )
print( "{0:_>30.10f} :: will call .Pool()".format( time.monotonic() ) )
#------mp.Pool()-----------------------------------------------------
pool = mp.Pool( mp.cpu_count() )
print( "{0:_>30.10f} :: pool.map() to be called".format( time.monotonic() ) )
#---.map()--------------------------------------?
#---.map( ?
pool.map( test_function, [i for i in range(4) ] )
#---.map( ?
#---.map()--------------------------------------?
print( "{0:_>30.10f} :: pool.map() call RETd".format( time.monotonic() ) )
pool.close()
#---.close()
print( "{0:_>30.10f} :: pool.close()-d".format( time.monotonic() ) )
pool.join()
#---.join()
print( "{0:_>30.10f} :: pool.join()-d".format( time.monotonic() ) )
print( "EXECUTED on {0:}".format( platform.version() ) )
print( "USING: python-{0:}:".format( platform.python_version() ) )
在 Linux 类 O/S 上可能看起来像这样的东西:
(py3) Fri Nov 08 14:26:40 :~$ python ~/p/StackOverflow_multiprocessing.py
8 cores reported by mp.cpu_count()
{0, 1, 2, 3} cores permit'd by os.sched_getaffinity(0)
O/S sub-process instantiation methods ['fork', 'spawn', 'forkserver'] available
O/S will use this instantiation method fork
____________1284931.1678911699 :: will call .Pool()
____________1284931.2063829789 :: pool.map() to be called
____________1284931.2383207241 :: PID:15848 with PPID:15847 runs test_function( i = 0 )-LINE[___7], invoked from mapstar()-LINE[__44]
____________1284931.2506985001 :: PID:15849 with PPID:15847 runs test_function( i = 1 )-LINE[___7], invoked from mapstar()-LINE[__44]
____________1284931.2614207701 :: PID:15851 with PPID:15847 runs test_function( i = 2 )-LINE[___7], invoked from mapstar()-LINE[__44]
____________1284931.2671745829 :: PID:15850 with PPID:15847 runs test_function( i = 3 )-LINE[___7], invoked from mapstar()-LINE[__44]
____________1284941.2504994699 :: PID:15848 with PPID:15847 ends test_function( i = 0 )-LINE[__16],
____________1284941.2550825749 :: PID:15849 with PPID:15847 ends test_function( i = 1 )-LINE[__16],
____________1284941.2698363690 :: PID:15851 with PPID:15847 ends test_function( i = 2 )-LINE[__16],
____________1284941.2776791099 :: PID:15850 with PPID:15847 ends test_function( i = 3 )-LINE[__16],
____________1284941.2780045229 :: pool.map() call RETd
____________1284941.2780527000 :: pool.close()-d
____________1284941.3343055181 :: pool.join()-d
EXECUTED on #1 SMP oSname M.m.n-o.p (YYYY-MM-DD)
USING: python-3.5.6:
检查隐藏的细节 -您的操作系统用于调用test_function()
-mapstar()
(不是普遍确定的选择)是本地SMP-linux类O/S对其默认子进程实例化方法的选择,通过">fork
"执行。
我怀疑您可能在多处理中遇到了一个常见的问题:
从多个线程(或进程)打印到共享日志/屏幕的执行(同时)可能会产生令人困惑的结果!
这也解释了为什么您会看到不同的行为取决于操作系统。 不同的操作系统将以略有不同的方式解决此问题。底层缓冲方案、访问控制等将有所作为。
您可能正在获得预期的多处理,但您的打印输出可能会产生误导。
我知道您提供了此代码作为示例,以演示现实世界的问题。因此,只需返回原始代码并再次考虑上述经常被忽视的事实:打印(或记录到文件)正在访问共享资源。您可能需要锁定或排队或其他技术。在不知道您真正问题的细节的情况下,就无法提出更多建议。