进程池类对象 Python.cause 中的执行器:无法腌制 'weakref' 对象



我有下面的一段代码,它抛出了"cannot pickle"weakref"object"。

我见过很多例子,人们用if __name__ == '__main__'包装ProcessPoolExecutor,但这在我的类中是不可能的。没有关于如何在python类中使用ProcessPoolExecutor的示例。

作为参考,ThreadPoolExecutor运行良好。

有人知道怎么做吗?

堆栈跟踪为:

File "C:UsersmikkeALG_EXPThreeSumpythonSolImplementationsnear_linear_algRandomizedMultiThreaded.py", line 35, in fasterSubsetSum
S = self.sumSet(S, f.result(), t)
File "C:Python38libconcurrentfutures_base.py", line 439, in result
return self.__get_result()
File "C:Python38libconcurrentfutures_base.py", line 388, in __get_result
raise self._exception
File "C:Python38libmultiprocessingqueues.py", line 239, in _feed
obj = _ForkingPickler.dumps(obj)
File "C:Python38libmultiprocessingreduction.py", line 51, in dumps
cls(buf, protocol).dump(obj)
TypeError: cannot pickle 'weakref' object

代码是:

import concurrent.futures
import math
import threading
import numpy as np
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ProcessPoolExecutor, as_completed
from Implementations.helpers.Helper import toNumbers
from Implementations.near_linear_alg.RandomizedBase import NearLinearBase

class RandomizedMultiThreaded(NearLinearBase):
def __init__(self, debug):
super().__init__(debug, 0)
self.executor = ProcessPoolExecutor(max_workers=12)
def fasterSubsetSum(self, Z, t, delta):
n = len(Z)
self.n = n
Z = np.array(Z)
Zi = self.partitionIntoLayers(Z, n, t)
S = [1]
if len(Zi[0]) > 1:
S = Zi[0]
futures = list()
for i in range(1, len(Zi)):
z = np.array(Zi[i])
if len(z) > 1:
if len(z) > 1:
ans = self.executor.submit(self.ColorCodingLayerMulti, z, t, pow(2, i + 1) - 1,
delta / (math.ceil(math.log2(n))))
futures.append(ans)
for f in futures:
S = self.sumSet(S, f.result(), t)
return toNumbers(S)

不能使executor成为传递给提交回调的对象的成员,因为executor不能序列化并发送到其他进程。

因此,这再现了您的错误:

投掷TypeError: can't pickle weakref objects

from concurrent.futures import ProcessPoolExecutor
class Example:
def __init__(self):
self.executor = ProcessPoolExecutor(2)
def compute(self, x):
return x*x
def run(self):
futures = []
for i in range(5):
futures.append(self.executor.submit(self.compute, i))
return sum(future.result() for future in futures)
print(Example().run())

你有两个选项来解决这个问题:

解决方案A:从类中删除执行器

from concurrent.futures import ProcessPoolExecutor
class Example:
def compute(self, x):
return x*x
def run(self):
executor = ProcessPoolExecutor(2)
futures = []
for i in range(5):
futures.append(executor.submit(self.compute, i))
return sum(future.result() for future in futures)

print(Example().run())

解决方案B:不要将self传递给回调

from concurrent.futures import ProcessPoolExecutor

def compute(x):
return x*x

class Example:
def __init__(self):
self.executor = ProcessPoolExecutor(2)

def run(self):
futures = []
for i in range(5):
futures.append(self.executor.submit(compute, i))
return sum(future.result() for future in futures)

print(Example().run())

相关内容

  • 没有找到相关文章

最新更新