有没有办法在嵌套函数或模块中使用multiprocessing.pool?



感谢您查看此内容。我承认我已经涉足 python 的并行处理整整 1 周了,所以如果我错过了一个明显的解决方案,我深表歉意。我有一段代码,我想运行 mp.pool() 的几个不同实例。那些在主.py文件中称为的那些工作正常,但是当我尝试将它们添加到模块中的函数时,我没有得到它们的输出。该应用程序只是运行它并继续。 我认为这可能与这篇文章有关,但它没有给出任何关于完成我需要的替代方法的想法。在一个简单的示例中工作的代码是这样的:

import multiprocessing as mp
def multiproc_log_result(retval):
results.append(retval)
if len(results) % (10 // 10) == 0:
print('{0}% done'.format(100 * len(results) / 10))
def meat():
print 'beef'
status = True
return status
results = []
pool = mp.Pool(thread_count)
for x in range(10):
pool.apply_async(meat, callback=multiproc_log_result)
pool.close()
pool.join()

def veggie():
print 'carrot'
status = True
return status
results = []
pool = mp.Pool(thread_count)
for x in range(10):
pool.apply_async(veggie, callback=multiproc_log_result)
pool.close()
pool.join()

不起作用的代码是:

import multiprocessing as mp
def multiproc_log_result(retval):
results.append(retval)
if len(results) % (10 // 10) == 0:
print('{0}% done'.format(100 * len(results) / 10))
def meat():
print 'beef'
status = True
return status
results = []
pool = mp.Pool(thread_count)
for x in range(10):
pool.apply_async(meat, callback=multiproc_log_result)
pool.close()
pool.join()
def nested_stupid_fn():
def multiproc_log_result(retval):
results.append(retval)
if len(results) % (10 // 10) == 0:
print('{0}% done'.format(100 * len(results) / 10))
def veggie():
print 'carrot'
status = True
return status
results = []
pool = mp.Pool(thread_count)
for x in range(10):
pool.apply_async(veggie, callback=multiproc_log_result)
pool.close()
pool.join()
nested_stupid_fn()

最终,我希望通过将其存在于单独模块中的另一个函数中来删除该不起作用的示例。因此,当我导入模块packngo并将其用作packngo.basic_packngo(输入)并在其中的某个位置具有嵌套函数的内容时,它们将运行。任何帮助将不胜感激。:D我是一个非常单纯的人,所以如果你能像你对一个孩子一样解释,也许它会沉入我的脑海!

您链接的另一个问题有解决方案,只是没有详细说明:您不能使用嵌套函数作为multiprocessing.Poolapply*/*map*系列方法的func参数。它们适用于multiprocessing.dummy.Pool,因为multiprocessing.dummy由线程支持,线程可以直接传递函数引用,但multiprocessing.Pool必须挑制函数,并且只有具有可导入名称的函数才能被挑制。如果你检查嵌套函数的名称,它类似于modulename.outerfuncname.<locals>.innerfuncname,并且<locals>组件使其无法导入(这通常是一件好事;利用嵌套的嵌套函数通常在闭包范围内具有临界状态,仅导入会丢失)。

以嵌套方式定义callback函数是完全可以的,因为它们是在父进程中执行的,它们不会发送给工作线程。在您的情况下,只有回调依赖于闭包范围,因此将func(veggie)移出到全局范围是完全可以的,将packngo模块定义为:

def veggie():
print 'carrot'
status = True
return status
def nested_stupid_fn():
def multiproc_log_result(retval):
results.append(retval)
if len(results) % (10 // 10) == 0:
print('{0}% done'.format(100 * len(results) / 10))
results = []
pool = mp.Pool(thread_count)
for x in range(10):
pool.apply_async(veggie, callback=multiproc_log_result)
pool.close()
pool.join()
nested_stupid_fn()

是的,这意味着veggie成为相关模块的公共成员。如果你想指示它应该被视为一个实现细节,你可以在它前面加上下划线(_veggie),但它必须是全局的才能与multiprocessing.Pool一起使用。

我认为问题是在multiproc_log_result范围内不存在变量results。 因此,您应该做的是将异步调用的结果直接附加到结果中。 不过,您将无法跟踪进度(我猜无法直接共享类外回调函数的全局变量)

from multiprocessing.pool import ThreadPool
def nested_stupid_fn():
def multiproc_log_result(retval):
results.append(retval)
def veggie():
print 'carrot'
status = True
return status
results = []
pool = ThreadPool(thread_count)
for x in range(10):
results.append(pool.apply_async(veggie))
pool.close()
pool.join()
results = [result.get() for result in results]  # get value from async result
...then do stuff with results

相关内容

  • 没有找到相关文章

最新更新