Python multiprocessing pool.map 不并行工作



我写了一个简单的并行python程序

import multiprocessing as mp
import time
def test_function(i):
print("function starts" + str(i))
time.sleep(1)
print("function ends" + str(i))
if __name__ == '__main__':
pool = mp.Pool(mp.cpu_count())
pool.map(test_function, [i for i in range(4)])
pool.close()
pool.join()

我希望在输出中看到的内容:

function starts0
function starts2
function starts1
function starts3
function ends1
function ends3
function ends2
function ends0

我实际看到的:

function starts1
function ends1
function starts3
function ends3
function starts2
function ends2
function starts0
function ends0

当我查看输出时,它看起来像pool.map运行一个函数并等待它完成然后运行另一个函数,但是当我计算整个程序的持续时间时,它大约是 2 秒,除非test_function并行运行,否则这是不可能的


编辑:

此代码在MacOS和Linux中运行良好,但在Windows 10上未显示预期的输出。 Python 版本是 3.6.4

multiprocessing.Pool()文档(从此,包括 Py27)清楚地表明,有意阻止处理由迭代器生成的仅 -4- 调用集创建的调用队列,该队列是从上面发布的示例按顺序生成的

multiprocessing-module 文档对其Pool.map()方法进行了说明:

map(func, iterable[, chunksize])

map()内置函数的并行等效项(虽然它只支持一个可迭代参数)。它会阻止,直到结果准备就绪。

这应该是观察到的行为,而不同的实例化方法会产生不同的附加(进程复制相关)开销成本。

无论如何,mp.cpu_count()不一定是任何此类调度的 CPU 内核数.Pool()-实例工作线程的任务将继续执行,因为 O/S(与用户/进程相关的限制策略)设置的亲和力:

您的代码必须"服从"这些 CPU 内核的子集,这些内核允许由任何此类multiprocessing请求的子进程利用,
其数量不高于:len( os.sched_getaffinity( 0 ) )


最佳下一步:重新评估整个代码执行生态系统

import multiprocessing as mp                                            # file: ~/p/StackOverflow_multiprocessing.py
import time, os, platform, inspect                                      # https://stackoverflow.com/questions/58738716/python-multiprocessing-pool-map-doesnt-work-parallel/58755642
def test_function( i = -1 ):
pass;                                                                                                  thisframerecord = inspect.stack()[0] # 0 represents this line
pass;                                                                                                callerframerecord = inspect.stack()[1] # 1 represents line at caller
pass;                                                                 _INFO_ = inspect.getframeinfo(   thisframerecord[0] )
pass;                                                               _CALLER_ = inspect.getframeinfo( callerframerecord[0] )
print( "{0:_>30.10f} ::".format(              time.monotonic() ),
"PID:{0:} with PPID:{1:} runs".format( os.getpid(), os.getppid() ),
"{0:}( i = {2:} )-LINE[{1:_>4d}],".format(                     _INFO_.function,   _INFO_.lineno, i ),
"invoked from {0:}()-LINE[{1:_>4d}]".format(                 _CALLER_.function, _CALLER_.lineno )
)
time.sleep( 10 )
pass;                                                                                                  thisframerecord = inspect.stack()[0] # 0 represents this line
pass;                                                                 _INFO_ = inspect.getframeinfo(   thisframerecord[0] )                 # 1 represents line at caller
print( "{0:_>30.10f} ::".format(              time.monotonic() ),
"PID:{0:} with PPID:{1:} ends".format( os.getpid(), os.getppid() ),
"{0:}( i = {2:} )-LINE[{1:_>4d}],".format(                     _INFO_.function,   _INFO_.lineno, i )
)
if __name__ == '__main__':
print( "{0:} cores reported by {1:}".format( mp.cpu_count(), "mp.cpu_count()" ) )
print( "{0:} cores permit'd by {1:}".format( os.sched_getaffinity(0), "os.sched_getaffinity(0)" ) )
print( "O/S sub-process instantiation methods {0:} available".format( mp.get_all_start_methods() ) )
print( "O/S will use this instantiation method {0:}".format( mp.get_start_method() ) )
print( "{0:_>30.10f} :: will call .Pool()".format( time.monotonic() ) )
#------mp.Pool()-----------------------------------------------------
pool = mp.Pool( mp.cpu_count() )
print( "{0:_>30.10f} :: pool.map() to be called".format( time.monotonic() ) )
#---.map()--------------------------------------?
#---.map(                                       ?
pool.map( test_function, [i for i in range(4) ] )
#---.map(                                       ?
#---.map()--------------------------------------?
print( "{0:_>30.10f} :: pool.map() call RETd".format( time.monotonic() ) )
pool.close()
#---.close()
print( "{0:_>30.10f} :: pool.close()-d".format( time.monotonic() ) )
pool.join()
#---.join()
print( "{0:_>30.10f} :: pool.join()-d".format( time.monotonic()          ) )
print( "EXECUTED on {0:}".format(              platform.version()        ) )
print( "USING: python-{0:}:".format(           platform.python_version() ) )

在 Linux 类 O/S 上可能看起来像这样的东西:

(py3) Fri Nov 08 14:26:40 :~$ python ~/p/StackOverflow_multiprocessing.py
8 cores reported by mp.cpu_count()
{0, 1, 2, 3} cores permit'd by os.sched_getaffinity(0)
O/S sub-process instantiation methods ['fork', 'spawn', 'forkserver'] available
O/S will use this instantiation method fork
____________1284931.1678911699 :: will call .Pool()
____________1284931.2063829789 :: pool.map() to be called
____________1284931.2383207241 :: PID:15848 with PPID:15847 runs test_function( i = 0 )-LINE[___7], invoked from mapstar()-LINE[__44]
____________1284931.2506985001 :: PID:15849 with PPID:15847 runs test_function( i = 1 )-LINE[___7], invoked from mapstar()-LINE[__44]
____________1284931.2614207701 :: PID:15851 with PPID:15847 runs test_function( i = 2 )-LINE[___7], invoked from mapstar()-LINE[__44]
____________1284931.2671745829 :: PID:15850 with PPID:15847 runs test_function( i = 3 )-LINE[___7], invoked from mapstar()-LINE[__44]
____________1284941.2504994699 :: PID:15848 with PPID:15847 ends test_function( i = 0 )-LINE[__16],
____________1284941.2550825749 :: PID:15849 with PPID:15847 ends test_function( i = 1 )-LINE[__16],
____________1284941.2698363690 :: PID:15851 with PPID:15847 ends test_function( i = 2 )-LINE[__16],
____________1284941.2776791099 :: PID:15850 with PPID:15847 ends test_function( i = 3 )-LINE[__16],
____________1284941.2780045229 :: pool.map() call RETd
____________1284941.2780527000 :: pool.close()-d
____________1284941.3343055181 :: pool.join()-d
EXECUTED on #1 SMP oSname M.m.n-o.p (YYYY-MM-DD)
USING: python-3.5.6:

检查隐藏的细节 -您的操作系统用于调用test_function()-mapstar()(不是普遍确定的选择)是本地SMP-linux类O/S对其默认子进程实例化方法的选择,通过">fork"执行。

我怀疑您可能在多处理中遇到了一个常见的问题:

从多个线程(或进程)打印到共享日志/屏幕的执行(同时)可能会产生令人困惑的结果!

这也解释了为什么您会看到不同的行为取决于操作系统。 不同的操作系统将以略有不同的方式解决此问题。底层缓冲方案、访问控制等将有所作为。

您可能正在获得预期的多处理,但您的打印输出可能会产生误导。

我知道您提供了此代码作为示例,以演示现实世界的问题。因此,只需返回原始代码并再次考虑上述经常被忽视的事实:打印(或记录到文件)正在访问共享资源。您可能需要锁定或排队或其他技术。在不知道您真正问题的细节的情况下,就无法提出更多建议。

相关内容

  • 没有找到相关文章

最新更新