我正在创建一个线程管理器类,它将执行任务作为线程处理,并将结果传递给下一个进程步骤。该流在第一次执行接收任务时正常工作,但第二次执行失败,出现以下错误:
...python3.8/concurrent/futures/thread.py", line 179, in submit
raise RuntimeError('cannot schedule new futures after shutdown')
RuntimeError: cannot schedule new futures after shutdown
任务来自Cmd.cmdloop
用户输入-因此,脚本是持久的,意味着不会关闭。相反,当从用户接收到输入时,run
将被多次调用。
我已经实现了一个ThreadPoolExecutor
来处理工作负载,并试图用concurrent.futures.as_completed
按时间顺序收集结果,以便每个项目按照完成的顺序处理到下一步。
下面的run
方法在第一次执行时完美地工作,但在第二次执行相同的任务时返回错误(在第一次执行期间成功)。
def run ( self, _executor=None, _futures={}, ) -> bool :
task = self.pipeline.get( )
with _executor or self.__default_executor as executor :
_futures = { executor.submit ( task.target.execute, ), }
for future in concurrent.futures.as_completed ( _futures, ) :
print( future.result ( ) )
return True
所以,这个想法是,每次调用run
将创建和拆除executor
与上下文。但是错误提示上下文关闭正确后,第一次执行,并不能重新开放/重新创建时,run
在第二次迭代期间调用…这个错误指向什么?. .我错过了什么?
任何帮助将是伟大的-提前感谢。
你最简单的解决方案是使用multiprocessing库,而不是发送future和ThreadPoolExecutore与Context Manager:
pool = ThreadPool(50)
pool.starmap(test_function, zip(array1,array2...))
pool.close()
pool.join()
而(array1[0] , array2[0])
将是发送给function" test_function"在第一个线程,(array1[1] , array2[1])
在第二个线程,以此类推。