如果我有这个:
import concurrent.futures
def SetParameters():
return { "processVal" : 'q',
"keys" : { "key1" : { "fn" : a,
"future" : None,
"futureResult" : None },
"key2" : { "fn" : b,
"future" : None,
"futureResult" : None }
}
}
#in my target production code, a() and b() and around a dozen others will each
#take a long while to complete and are IO-bound, hence the desire for
#parallelization
def a(x):
# x is return of SetParameters()
return f"1{x['processVal']}2"
def b(x):
# x is return of SetParameters()
return f"3{x['processVal']}4"
def c(myDict):
with concurrent.futures.ProcessPoolExecutor() as exe:
for key in myDict["keys"]:
keyDict = myDict["keys"][key]
keyDict["future"] = exe.submit(keyDict["fn"], myDict)
for key in myDict["keys"]:
keyDict = myDict["keys"][key]
keyDict["futureResult"] = keyDict["future"].result()
def main():
data = SetParameters()
c(data)
for key in data["keys"]:
print(data["keys"][key]["futureResult"])
if __name__ == "__main__":
main()
当我执行这个代码时,我有一个失控的分叉问题需要解决,但这个线程中的请求是理解这个错误以及如何处理它:
TypeError:无法pickle'_thread.RLock'对象
我怀疑这是SetParameters
返回的数据结构,但只是从第二次通过第一次for
循环开始。我的直觉是futures
对象是不可拾取的(因此问题只出现在第二次传递中(,但如果事先不知道有多少key<n>
对象,我如何将函数结果返回到myDict[keys][key<n>][futuresResult]
中?
你对错误的猜测是对的。您将一个future
对象保存到字典中,然后将该字典传递给并行函数,这需要参数是可拾取的。我假设您不需要函数中的结果。因此,在不知道如何使用它的情况下,有几个选择。
- 只需将所需内容传递到函数中即可
import concurrent.futures
def SetParameters():
return { "processVal" : 'q',
"keys" : { "key1" : { "fn" : a,
"future" : None,
"futureResult" : None },
"key2" : { "fn" : b,
"future" : None,
"futureResult" : None }
}
}
#in my target production code, a() and b() and around a dozen others will each
#take a long while to complete and are IO-bound, hence the desire for
#parallelization
def a(x):
# x is return of SetParameters()
return f"1{x}2"
def b(x):
# x is return of SetParameters()
return f"3{x}4"
def c(myDict):
with concurrent.futures.ProcessPoolExecutor() as exe:
for key in myDict["keys"]:
keyDict = myDict["keys"][key]
keyDict["future"] = exe.submit(keyDict["fn"], myDict['processVal'])
for key in myDict["keys"]:
keyDict = myDict["keys"][key]
keyDict["futureResult"] = keyDict["future"].result()
def main():
data = SetParameters()
c(data)
for key in data["keys"]:
print(data["keys"][key]["futureResult"])
if __name__ == "__main__":
main()
- 制作myDict的深度副本,但要知道没有函数可以使用结果(看起来也不会像你希望的那样(
import concurrent.futures
import copy
def SetParameters():
return { "processVal" : 'q',
"keys" : { "key1" : { "fn" : a,
"future" : None,
"futureResult" : None },
"key2" : { "fn" : b,
"future" : None,
"futureResult" : None }
}
}
#in my target production code, a() and b() and around a dozen others will each
#take a long while to complete and are IO-bound, hence the desire for
#parallelization
def a(x):
# x is return of SetParameters()
return f"1{x['processVal']}2"
def b(x):
# x is return of SetParameters()
return f"3{x['processVal']}4"
def c(myDict):
myCopy = copy.deepcopy(myDict)
with concurrent.futures.ProcessPoolExecutor() as exe:
for key in myDict["keys"]:
keyDict = myDict["keys"][key]
keyDict["future"] = exe.submit(keyDict["fn"], myCopy)
for key in myDict["keys"]:
keyDict = myDict["keys"][key]
keyDict["futureResult"] = keyDict["future"].result()
def main():
data = SetParameters()
c(data)
for key in data["keys"]:
print(data["keys"][key]["futureResult"])
if __name__ == "__main__":
main()
- 将结果保存到其他地方,稍后合并(或不合并(
import concurrent.futures
from collections import defaultdict
def SetParameters():
return { "processVal" : 'q',
"keys" : { "key1" : { "fn" : a,
"future" : None,
"futureResult" : None },
"key2" : { "fn" : b,
"future" : None,
"futureResult" : None }
}
}
#in my target production code, a() and b() and around a dozen others will each
#take a long while to complete and are IO-bound, hence the desire for
#parallelization
def a(x):
# x is return of SetParameters()
return f"1{x['processVal']}2"
def b(x):
# x is return of SetParameters()
return f"3{x['processVal']}4"
def c(myDict):
myResults = defaultdict(dict)
with concurrent.futures.ProcessPoolExecutor() as exe:
for key in myDict["keys"]:
myResults[key]["future"] = exe.submit(myDict['keys'][key]["fn"], myDict)
for key in myDict["keys"]:
myResults[key]["futureResult"] = myResults[key]["future"].result()
return myResults
def main():
data = SetParameters()
res = c(data)
for result in res.values():
print(result["futureResult"])
if __name__ == "__main__":
main()
我相信还有更多。但这些都是一些通用的解决方法,取决于你想要实现的目标。