很抱歉,我无法用更简单的示例重现错误,而且我的代码太复杂,无法发布。如果我在IPython shell中运行该程序,而不是在常规Python中运行,那么一切都会很顺利。
我查阅了以前关于这个问题的一些笔记。它们都是由使用池来调用类函数中定义的函数引起的。但我的情况并非如此。
Exception in thread Thread-3:
Traceback (most recent call last):
File "/usr/lib64/python2.7/threading.py", line 552, in __bootstrap_inner
self.run()
File "/usr/lib64/python2.7/threading.py", line 505, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/lib64/python2.7/multiprocessing/pool.py", line 313, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
如果有任何帮助,我将不胜感激。
更新:我pickle的函数是在模块的顶层定义的。尽管它调用了一个包含嵌套函数的函数。即f()
调用g()
调用具有嵌套函数i()
的h()
,而我正在调用pool.apply_async(f)
。f()
、g()
、h()
都是在顶层定义的。我用这个模式尝试了一个更简单的例子,它仍然有效。
以下是可以腌制的食物列表。特别是,只有在模块的顶层定义了函数时,这些函数才是可拾取的。
这段代码:
import multiprocessing as mp
class Foo():
@staticmethod
def work(self):
pass
if __name__ == '__main__':
pool = mp.Pool()
foo = Foo()
pool.apply_async(foo.work)
pool.close()
pool.join()
产生的错误与您发布的错误几乎相同:
Exception in thread Thread-2:
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
self.run()
File "/usr/lib/python2.7/threading.py", line 505, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 315, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
问题是pool
方法都使用mp.SimpleQueue
将任务传递给工作进程。通过mp.SimpleQueue
的所有内容都必须是可拾取的,而foo.work
不可拾取,因为它不是在模块的顶层定义的。
它可以通过在顶层定义一个函数来修复,该函数调用foo.work()
:
def work(foo):
foo.work()
pool.apply_async(work,args=(foo,))
请注意,foo
是可拾取的,因为Foo
是在顶层定义的,而foo.__dict__
是可拾取。
我会使用pathos.multiprocesssing
,而不是multiprocessing
。CCD_ 19是使用CCD_ 21的CCD_。dill
可以在python中序列化几乎任何内容,因此您可以并行发送更多内容。pathos
fork还能够直接使用多个参数函数,这是类方法所需要的。
>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> p = Pool(4)
>>> class Test(object):
... def plus(self, x, y):
... return x+y
...
>>> t = Test()
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]
>>>
>>> class Foo(object):
... @staticmethod
... def work(self, x):
... return x+1
...
>>> f = Foo()
>>> p.apipe(f.work, f, 100)
<processing.pool.ApplyResult object at 0x10504f8d0>
>>> res = _
>>> res.get()
101
在此处获取pathos
(如果您愿意,还可以获取dill
):https://github.com/uqfoundation
当multiprocessing
出现此问题时,一个简单的解决方案是从Pool
切换到ThreadPool
。这可以在不改变代码的情况下完成,除了导入-
from multiprocessing.pool import ThreadPool as Pool
这是因为ThreadPool与主线程共享内存,而不是创建一个新进程——这意味着不需要酸洗。
这种方法的缺点是python并不是处理线程的最佳语言——它使用了一种名为全局解释器锁的东西来保持线程安全,这可能会减缓一些用例的速度。然而,如果您主要与其他系统交互(运行HTTP命令、与数据库对话、向文件系统写入),那么您的代码可能不受CPU的约束,也不会受到太大的影响。事实上,我在编写HTTP/HTTPS基准测试时发现,这里使用的线程模型的开销和延迟较小,因为创建新进程的开销远高于创建新线程的开销,而且程序在其他方面只是在等待HTTP响应。
因此,如果你在python用户空间中处理大量的东西,这可能不是最好的方法。
正如其他人所说,multiprocessing
只能将Python对象传输到可以进行pickle的工作进程。如果您不能按照unsubu的描述重新组织代码,您可以使用dill
的扩展pickle/unpickling功能来传输数据(尤其是代码数据),如下所示。
此解决方案只需要安装dill
,不需要像pathos
:那样安装其他库
import os
from multiprocessing import Pool
import dill
def run_dill_encoded(payload):
fun, args = dill.loads(payload)
return fun(*args)
def apply_async(pool, fun, args):
payload = dill.dumps((fun, args))
return pool.apply_async(run_dill_encoded, (payload,))
if __name__ == "__main__":
pool = Pool(processes=5)
# asyn execution of lambda
jobs = []
for i in range(10):
job = apply_async(pool, lambda a, b: (a, b, a * b), (i, i + 1))
jobs.append(job)
for job in jobs:
print job.get()
print
# async execution of static method
class O(object):
@staticmethod
def calc():
return os.getpid()
jobs = []
for i in range(10):
job = apply_async(pool, O.calc, ())
jobs.append(job)
for job in jobs:
print job.get()
我发现,通过尝试在一段完美工作的代码上使用探查器,我也可以在它上准确地生成错误输出。
请注意,这是在Windows上进行的(在Windows中,分叉稍微不那么优雅)。
我在跑步:
python -m profile -o output.pstats <script>
发现删除评测删除了错误,并放置评测恢复了错误。这也让我很恼火,因为我知道代码过去是有效的。我在检查是否有什么东西更新了pool.py…然后有一种下沉的感觉,消除了分析,就这样。
在这里张贴档案,以防其他人碰到。
Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
如果在传递给异步作业的模型对象中有任何内置函数,也会出现此错误。
因此,请确保检查传递的模型对象是否没有内置函数。(在我们的案例中,我们在模型中使用django模型utils的FieldTracker()
函数来跟踪某个字段)。这是相关GitHub问题的链接。
该解决方案只需要安装dill,而不需要像pathos 那样安装其他库
def apply_packed_function_for_map((dumped_function, item, args, kwargs),):
"""
Unpack dumped function as target function and call it with arguments.
:param (dumped_function, item, args, kwargs):
a tuple of dumped function and its arguments
:return:
result of target function
"""
target_function = dill.loads(dumped_function)
res = target_function(item, *args, **kwargs)
return res
def pack_function_for_map(target_function, items, *args, **kwargs):
"""
Pack function and arguments to object that can be sent from one
multiprocessing.Process to another. The main problem is:
«multiprocessing.Pool.map*» or «apply*»
cannot use class methods or closures.
It solves this problem with «dill».
It works with target function as argument, dumps it («with dill»)
and returns dumped function with arguments of target function.
For more performance we dump only target function itself
and don't dump its arguments.
How to use (pseudo-code):
~>>> import multiprocessing
~>>> images = [...]
~>>> pool = multiprocessing.Pool(100500)
~>>> features = pool.map(
~... *pack_function_for_map(
~... super(Extractor, self).extract_features,
~... images,
~... type='png'
~... **options,
~... )
~... )
~>>>
:param target_function:
function, that you want to execute like target_function(item, *args, **kwargs).
:param items:
list of items for map
:param args:
positional arguments for target_function(item, *args, **kwargs)
:param kwargs:
named arguments for target_function(item, *args, **kwargs)
:return: tuple(function_wrapper, dumped_items)
It returs a tuple with
* function wrapper, that unpack and call target function;
* list of packed target function and its' arguments.
"""
dumped_function = dill.dumps(target_function)
dumped_items = [(dumped_function, item, args, kwargs) for item in items]
return apply_packed_function_for_map, dumped_items
它也适用于numpy数组。
一个快速的解决方案是将函数设置为全局
from multiprocessing import Pool
class Test:
def __init__(self, x):
self.x = x
@staticmethod
def test(x):
return x**2
def test_apply(self, list_):
global r
def r(x):
return Test.test(x + self.x)
with Pool() as p:
l = p.map(r, list_)
return l
if __name__ == '__main__':
o = Test(2)
print(o.test_apply(range(10)))
基于@rocksportrocker解决方案,在发送和接收结果时磨磨蹭蹭是有意义的。
import dill
import itertools
def run_dill_encoded(payload):
fun, args = dill.loads(payload)
res = fun(*args)
res = dill.dumps(res)
return res
def dill_map_async(pool, fun, args_list,
as_tuple=True,
**kw):
if as_tuple:
args_list = ((x,) for x in args_list)
it = itertools.izip(
itertools.cycle([fun]),
args_list)
it = itertools.imap(dill.dumps, it)
return pool.map_async(run_dill_encoded, it, **kw)
if __name__ == '__main__':
import multiprocessing as mp
import sys,os
p = mp.Pool(4)
res = dill_map_async(p, lambda x:[sys.stdout.write('%sn'%os.getpid()),x][-1],
[lambda x:x+1]*10,)
res = res.get(timeout=100)
res = map(dill.loads,res)
print(res)
正如@penky Suresh在这个答案中建议的那样,不要使用内置关键字。
显然,在处理多处理时,args
是一个内置关键字
class TTS:
def __init__(self):
pass
def process_and_render_items(self):
multiprocessing_args = [{"a": "b", "c": "d"}, {"e": "f", "g": "h"}]
with ProcessPoolExecutor(max_workers=10) as executor:
# Using args here is fine.
future_processes = {
executor.submit(TTS.process_and_render_item, args)
for args in multiprocessing_args
}
for future in as_completed(future_processes):
try:
data = future.result()
except Exception as exc:
print(f"Generated an exception: {exc}")
else:
print(f"Generated data for comment process: {future}")
# Dont use 'args' here. It seems to be a built-in keyword.
# Changing 'args' to 'arg' worked for me.
def process_and_render_item(arg):
print(arg)
# This will print {"a": "b", "c": "d"} for the first process
# and {"e": "f", "g": "h"} for the second process.
附言:标签页/空格可能有点偏离。