如何在 Python 中的 concurrent.futures.ProcessPoolExecutor 中传递"lock"?



我正在真正的Android智能手机上运行Python 3.7和Appium 1.15.1的并行测试。

我使用concurrent.forets.ProcessPoolExecutor在每个智能手机上运行每个测试。

我正在将智能手机的uid列表传递给我的地图功能。通过这种方式,我的方法"run_smartphone(("(启动测试(获取智能手机的uid,并确定它必须在哪个智能手机上运行测试。

我的剧本写得很好,没有任何问题。但是我想添加一个"锁",因为'run_smartphone(('在sqlite3数据库上进行一些I/O。所以,如果我错了,请纠正我,但"锁定"这个sqlite3数据库上的I/O操作将是一个很好的做法?

这是我的原始代码,它工作:

def run_smartphone(p_udid):
#do the stuff
list_smartphones_connected = [41492968379078, 53519716736397]
with concurrent.futures.ProcessPoolExecutor() as executor:
try:
multiprocesses = executor.map(mymodules.run_smartphone, list_smartphones_connected)
except ValueError:
print(("Error multiprocesses"))

因此,我尝试将pass"lock"添加到我的方法"run_smartphone(("中。这是我写的:

m = multiprocessing.Manager()
lock = m.Lock()
list_arguments_smartphones = []
list_smartphones_connected = [41492968379078, 53519716736397]
for smartphone_connected in list_smartphones_connected:        
list_arguments_smartphones.append([smartphone_connected, lock])
with concurrent.futures.ProcessPoolExecutor() as executor:
try:
multiprocesses = executor.map(mymodules.run_smartphone, list_arguments_smartphones)
except ValueError:
print(("Error multiprocesses"))

但它不起作用,我也没有得到任何例外。Pycharm停止脚本:

Process finished with exit code 0

我不知道是什么阻止了剧本。

因此,我开始通过执行1部智能手机的脚本进行调查,其中包括:

multiprocesses = executor.map(mymodules.run_smartphone, [41492968379078,lock])

它给出了相同的结果=>脚本停止,没有自动启动,我没有看到任何异常(进程结束,退出代码为0(。

由于我想知道问题的确切位置,我用"trace"运行了脚本。

py -m trace --trace  myscript.py

但我什么都不懂,我没有看到任何错误。。。你可以在我上传到GitHub:的文本文件上看到这个"trace"命令的输出

https://github.com/gauthierbuttez/public/blob/master/trace-log.txt

有人知道如何将"锁"传递给我的concurrent.forets.ProcessPoolExecutor((吗?这样做是个好主意吗?

谢谢。

希望这能帮助你。。。

m = multiprocessing.Manager()
lock = m.Lock()
def run_smartphone(p_udid, lock): 
# further code
list_smartphones_connected = [41492968379078, 53519716736397] 
with concurrent.futures.ProcessPoolExecutor() as executor: 
try: 
multiprocesses = executor.map(run_smartphone, list_smartphones_connected, [lock]*len(list_smartphones_connected)) 
for function_return_value in multiprocesses:
print(function_return_value)
except ValueError: 
print(("Error multiprocesses"))

从map((的文档:

如果func调用引发异常,则将引发该异常当从迭代器检索其值时。

换句话说,您可能必须实际使用run_smartphone的返回值:

m = multiprocessing.Manager()
lock = m.Lock()
list_arguments_smartphones = []
list_smartphones_connected = [41492968379078, 53519716736397]
for smartphone_connected in list_smartphones_connected:        
list_arguments_smartphones.append([smartphone_connected, lock])
with concurrent.futures.ProcessPoolExecutor() as executor:
try:
multiprocesses = executor.map(mymodules.run_smartphone, list_arguments_smartphones)
for function_return_value in multiprocesses:
print(function_return_value)
# or do something with the value, like insert into a db
except ValueError:
print(("Error multiprocesses"))

但是,如果您甚至在向函数传递锁时遇到问题,并且您还没有阅读文档,那么我建议您重组代码,使run_smartphone只从数据库读取(不需要锁(,并在for循环中向数据库写入(不需要锁定(。否则,你将陷入僵局。

相关内容

  • 没有找到相关文章

最新更新