Python concurrent.futures atexit中出错_run_exitfuncs:OSError:句柄



我试图实现concurrent.futures遇到了问题,我生成了一个代码,当在中执行时

  • Python终端

  • Enters in some infinite loop in my last loop and do not finish the code, this code below do finish, however reproduces the error thgat i receive in the Debug Mode below
    
  • 所以当我在Visual Studio代码调试模式中运行它时

  • 生成以下错误:

    Error in atexit._run_exitfuncs:
    Traceback (most recent call last):
    File "C:Usersdeboranaconda3envsmultimidia_imagelibconcurrentfuturesprocess.py", line 102, in _python_exit
    thread_wakeup.wakeup()
    File "C:Usersdeboranaconda3envsmultimidia_imagelibconcurrentfuturesprocess.py", line 90, in wakeup
    self._writer.send_bytes(b"")
    File "C:Usersdeboranaconda3envsmultimidia_imagelibmultiprocessingconnection.py", line 183, in send_bytes
    self._check_closed()
    File "C:Usersdeboranaconda3envsmultimidia_imagelibmultiprocessingconnection.py", line 136, in _check_closed
    raise OSError("handle is closed")
    OSError: handle is closed
    

此代码再现调试模式中的错误:

import numpy as np
import pandas as pd
import concurrent.futures
import multiprocessing
from itertools import product

def main_execucao(n_exec,input_n_geracoes_max,tipo_apt,input_pop_0,tipo_pop_gerar,tipo_selecao_crossover,tour,tipo_crossover,input_p_cross,tipo_mutacao,tipo_reinsercao,words):
# Dicionário vazio para armazenar resultados por geração n+1
resultados = {}
resultados[("teste")] = [1,0]
return resultados

def main(): 
# Parametros problemáticos Process
tmi = ["tm1"]
si = ["s1"]
ci = ["c1"]
ri = ["r2"]
# Lista de variantes
variantes = list(product(*[tmi, si, ci, ri]))
# Numero de execuções
input_n_execucoes = 3

# Inputs para main
# Inputs gerais
# Lista iterável para chamar função
lista_n_exec=range(0,input_n_execucoes)
# Numero de gerações
input_n_geracoes_max =  [50] * len(lista_n_exec)
# Palavras
w_1=np.flip(np.array(list("send")),0)
w_2=np.flip(np.array(list("more")),0)
w_3=np.flip(np.array(list("money")),0)
words=[w_1,w_2,w_3]
words_lista=[words]* len(lista_n_exec)
for param in variantes:
# Inputs parameters

# Tipo aptidao (apt_1= aptidão simples, apt_2 aptidão invertida, apt_3 invertida normalizada pior valor)
tipo_apt = "apt_1"
tipo_apt_lista = [tipo_apt] * len(lista_n_exec)
# População inicial
input_pop_0 = [100] * len(lista_n_exec)
tipo_pop_gerar = ["sem_repeticao_populacao_inicial"] * len(lista_n_exec)

nome_selecao_cross  =param[1]
tipo_selecao_crossover = [nome_selecao_cross] * len(lista_n_exec)
tour = [3] * len(lista_n_exec)# Apenas se seleção for torneio 
# Tipo de crossover
nome_cross = param[2]
tipo_crossover = [nome_cross] * len(lista_n_exec)
# % de Crossover
input_p_cross = [0.8] * len(lista_n_exec)
# % de mutação
nome_mutacao = param[0]
if nome_mutacao=="tm1":
perc_mutacao=0.02
elif nome_mutacao=="tm2":
perc_mutacao=0.10
elif nome_mutacao=="tm3":
perc_mutacao=0.2
tipo_mutacao = [perc_mutacao] * len(lista_n_exec)
# Tipo de reinserção
nome_reinsercao = param[3]
tipo_reinsercao = [nome_reinsercao] * len(lista_n_exec)
# Finaliza inputs
# Index contar número de execuções e gerações
ix_exec_real=0
ger_exec_real=0
# Número total de convergencias após todas execuções 
total_conv_tds_exec=0
# Dicionário para armazenar resultados de todas as n execuções
results={}
with concurrent.futures.ProcessPoolExecutor() as executor:
for result in (executor.map(main_execucao,lista_n_exec,input_n_geracoes_max,tipo_apt_lista,input_pop_0,tipo_pop_gerar,tipo_selecao_crossover,tour,tipo_crossover,input_p_cross,tipo_mutacao,tipo_reinsercao,words_lista)):
results.update(result)

print("finish")
if __name__ == '__main__':
multiprocessing.freeze_support()
main()

有人知道我能尝试什么吗?我发现了一个类似的问题:https://github.com/getsentry/sentry-python/issues/423

非常感谢

根据#434,

Python标准库-concurrent.futures使用线程进行管理任务队列,因此当哨兵修补线程的启动方法时,它将导致引用循环并阻止gc收集。

因此,在concurrent.futures.process中_在运行完整的gc集合之前,threadswakeups将被丢弃,否则,当python退出时将引发异常,因为实际管理线程已停止。

可以在那里找到修复程序,允许用户手动将其输入到Python 3.8中,而不是升级到Python 3.9。

通过更改两行,从使用concurrent.futures.ProcessPoolExecutor类切换到使用multiprocess.pool.Pool类,查看问题是否消失:

更改自:

with concurrent.futures.ProcessPoolExecutor() as executor:
for result in (executor.map(main_execucao,lista_n_exec,input_n_geracoes_max,tipo_apt_lista,input_pop_0,tipo_pop_gerar,tipo_selecao_crossover,tour,tipo_crossover,input_p_cross,tipo_mutacao,tipo_reinsercao,words_lista)):

收件人:

with multiprocessing.Pool() as executor:

for result in (executor.starmap(main_execucao,zip(lista_n_exec,input_n_geracoes_max,tipo_apt_lista,input_pop_0,tipo_pop_gerar,tipo_selecao_crossover,tour,tipo_crossover,input_p_cross,tipo_mutacao,tipo_reinsercao,words_lista))):

当然,您现在也可以删除import concurrent.futures语句。

最新更新