我试图在n秒后对try语句施加TimeoutException。我找到了一个处理这个所谓信号的库,这将是完美的,但我遇到了一个错误,我很难处理。(这个类似的SO问题用信号库来回答。(
这是代表问题的简化代码:
import multiprocessing
from multiprocessing.dummy import Pool
def main():
listOfLinks = []
threadpool = Pool(2)
info = threadpool.starmap(processRunSeveralTimesInParallel,zip(enumerate(listOfLinks)))
threadpool.close()
def processRunSeveralTimesInParallel(listOfLinks):
#The following is pseudo code representing what I would like to do:
loongSequenceOfInstructions()
for i in range(0,10):
try for n seconds:
doSomething(i)
except (after n seconds):
handleException()
return something
当用信号库实现上述问题的解决方案时,我得到以下错误:
File "file.py", line 388, in main
info = threadpool.starmap(processRunSeveralTimesInParallel,zip(enumerate(listOfLinks)))
File "/Users/user/anaconda3/envs/proj/lib/python3.8/multiprocessing/pool.py", line 372, in starmap
return self._map_async(func, iterable, starmapstar, chunksize).get()
File "/Users/user/anaconda3/envs/proj/lib/python3.8/multiprocessing/pool.py", line 771, in get
raise self._value
File "/Users/user/anaconda3/envs/proj/lib/python3.8/multiprocessing/pool.py", line 125, in worker
result = (True, func(*args, **kwds))
File "/Users/user/anaconda3/envs/proj/lib/python3.8/multiprocessing/pool.py", line 51, in starmapstar
return list(itertools.starmap(args[0], args[1]))
File "file.py", line 193, in processRunSeveralTimesInParallel
signal.signal(signal.SIGALRM, signal_handler)
File "/Users/user/anaconda3/envs/proj/lib/python3.8/signal.py", line 47, in signal
handler = _signal.signal(_enum_to_int(signalnum), _enum_to_int(handler))
ValueError: signal only works in main thread
知道如何在作为线程运行的方法中仅在try块上限制时间吗?非常感谢。
重要信息:
- 我使用多处理库同时并行运行多个进程。从上面的错误语句中,我怀疑信号库和多处理库冲突
- try语句中的方法是selenium(find_element_by_xpath(方法。但是,没有可用的超时参数
最新更新的答案
如果你是一种在不使用信号的情况下寻找超时的方法,这里有一种方法。首先,由于您使用的是线程,所以让我们明确它,并使用具有很大灵活性的concurrent.futures
模块。
当一个";工作;提交给池执行器时,会立即返回Future
实例而不阻塞,直到对该实例进行result
调用为止。您可以指定一个timeout
值,这样,如果在超时时间内结果不可用,就会引发异常。这个想法是将ThreadPoolExecutor
实例传递给工作线程,让它在自己的工作线程中运行必须在特定时间段内完成的关键代码。将为该定时代码创建Future
实例,但这次result
调用将指定timeout
值:
from concurrent.futures import ThreadPoolExecutor, TimeoutError
import time
def main():
listOfLinks = ['a', 'b', 'c', 'd', 'e']
futures = []
"""
To prevent timeout errors due to lack of threads, you need at least one extra thread
in addition to the ones being created here so that at least one time_critical thread
can start. Of course, ideally you would like all the time_critical threads to be able to
start without waiting. So, whereas the minimum number of max_workers would be 6 in this
case, the ideal number would be 5 * 2 = 10.
"""
with ThreadPoolExecutor(max_workers=10) as executor:
# pass executor to our worker
futures = [executor.submit(processRunSeveralTimesInParallel, tuple, executor) for tuple in enumerate(listOfLinks)]
for future in futures:
result = future.result()
print('result is', result)
def processRunSeveralTimesInParallel(tuple, executor):
link_number = tuple[0]
link = tuple[1]
# long running sequence of instructions up until this point and then
# allow 2 seconds for this part:
for i in range(10):
future = executor.submit(time_critical, link, i)
try:
future.result(timeout=2) # time_critical does not return a result other than None
except TimeoutError:
handle_exception(link, i)
return link * link_number
def time_critical(link, trial_number):
if link == 'd' and trial_number == 7:
time.sleep(3) # generate a TimeoutError
def handle_exception(link, trial_number):
print(f'There was a timeout for link {link}, trial number {trial_number}.')
if __name__ == '__main__':
main()
打印:
result is
result is b
result is cc
There was a timeout for link d, trial number 7.
result is ddd
result is eeee
使用线程和多处理器
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, TimeoutError
import os
import time
def main():
listOfLinks = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j']
futures = []
cpu_count = os.cpu_count()
with ThreadPoolExecutor(max_workers=cpu_count) as thread_executor, ProcessPoolExecutor(max_workers=cpu_count) as process_executor:
# pass executor to our worker
futures = [thread_executor.submit(processRunSeveralTimesInParallel, tuple, process_executor) for tuple in enumerate(listOfLinks)]
for future in futures:
result = future.result()
print('result is', result)
def processRunSeveralTimesInParallel(tuple, executor):
link_number = tuple[0]
link = tuple[1]
# long running sequence of instructions up until this point and then
# allow 2 seconds for this part:
for i in range(10):
future = executor.submit(time_critical, link, i)
try:
future.result(timeout=2) # time_critical does not return a result other than None
except TimeoutError:
handle_exception(link, i)
return link * link_number
def time_critical(link, trial_number):
if link == 'd' and trial_number == 7:
time.sleep(3) # generate a TimeoutError
def handle_exception(link, trial_number):
print(f'There was a timeout for link {link}, trial number {trial_number}.')
if __name__ == '__main__':
main()
打印:
result is
result is b
result is cc
There was a timeout for link d, trial number 7.
result is ddd
result is eeee
result is fffff
result is gggggg
result is hhhhhhh
result is iiiiiiii
result is jjjjjjjjj
多处理器独占
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Process
import os
import time
def main():
listOfLinks = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j']
futures = []
workers = os.cpu_count() // 2
with ProcessPoolExecutor(max_workers=workers) as process_executor:
# pass executor to our worker
futures = [process_executor.submit(processRunSeveralTimesInParallel, tuple) for tuple in enumerate(listOfLinks)]
for future in futures:
result = future.result()
print('result is', result)
def processRunSeveralTimesInParallel(tuple):
link_number = tuple[0]
link = tuple[1]
# long running sequence of instructions up until this point and then
# allow 2 seconds for this part:
for i in range(10):
p = Process(target=time_critical, args=(link, i))
p.start()
p.join(timeout=2) # don't block for more than 2 seconds
if p.exitcode is None: # subprocess did not terminate
p.terminate() # we will terminate it
handle_exception(link, i)
return link * link_number
def time_critical(link, trial_number):
if link == 'd' and trial_number == 7:
time.sleep(3) # generate a TimeoutError
def handle_exception(link, trial_number):
print(f'There was a timeout for link {link}, trial number {trial_number}.')
if __name__ == '__main__':
main()
打印:
result is
result is b
result is cc
There was a timeout for link d, trial number 7.
result is ddd
result is eeee
result is fffff
result is gggggg
result is hhhhhhh
result is iiiiiiii
result is jjjjjjjjj