


import multiprocessing
from multiprocessing.dummy import Pool
def main():
listOfLinks = []
threadpool = Pool(2)
info = threadpool.starmap(processRunSeveralTimesInParallel,zip(enumerate(listOfLinks)))
def processRunSeveralTimesInParallel(listOfLinks):
#The following is pseudo code representing what I would like to do:
for i in range(0,10):
try for n seconds:
except (after n seconds):
return something


File "file.py", line 388, in main
info = threadpool.starmap(processRunSeveralTimesInParallel,zip(enumerate(listOfLinks)))
File "/Users/user/anaconda3/envs/proj/lib/python3.8/multiprocessing/pool.py", line 372, in starmap
return self._map_async(func, iterable, starmapstar, chunksize).get()
File "/Users/user/anaconda3/envs/proj/lib/python3.8/multiprocessing/pool.py", line 771, in get
raise self._value
File "/Users/user/anaconda3/envs/proj/lib/python3.8/multiprocessing/pool.py", line 125, in worker
result = (True, func(*args, **kwds))
File "/Users/user/anaconda3/envs/proj/lib/python3.8/multiprocessing/pool.py", line 51, in starmapstar
return list(itertools.starmap(args[0], args[1]))
File "file.py", line 193, in processRunSeveralTimesInParallel
signal.signal(signal.SIGALRM, signal_handler)
File "/Users/user/anaconda3/envs/proj/lib/python3.8/signal.py", line 47, in signal
handler = _signal.signal(_enum_to_int(signalnum), _enum_to_int(handler))
ValueError: signal only works in main thread



  • 我使用多处理库同时并行运行多个进程。从上面的错误语句中,我怀疑信号库和多处理库冲突
  • try语句中的方法是selenium(find_element_by_xpath(方法。但是,没有可用的超时参数




from concurrent.futures import ThreadPoolExecutor, TimeoutError
import time

def main():
listOfLinks = ['a', 'b', 'c', 'd', 'e']
futures = []
To prevent timeout errors due to lack of threads, you need at least one extra thread
in addition to the ones being created here so that at least one time_critical thread
can start. Of course, ideally you would like all the time_critical threads to be able to
start without waiting. So, whereas the minimum number of max_workers would be 6 in this
case, the ideal number would be 5 * 2 = 10.
with ThreadPoolExecutor(max_workers=10) as executor:
# pass executor to our worker
futures = [executor.submit(processRunSeveralTimesInParallel, tuple, executor) for tuple in enumerate(listOfLinks)]
for future in futures:
result = future.result()
print('result is', result)

def processRunSeveralTimesInParallel(tuple, executor):
link_number = tuple[0]
link = tuple[1]
# long running sequence of instructions up until this point and then
# allow 2 seconds for this part:
for i in range(10):
future = executor.submit(time_critical, link, i)
future.result(timeout=2) # time_critical does not return a result other than None
except TimeoutError:
handle_exception(link, i)
return link * link_number

def time_critical(link, trial_number):
if link == 'd' and trial_number == 7:
time.sleep(3) # generate a TimeoutError

def handle_exception(link, trial_number):
print(f'There was a timeout for link {link}, trial number {trial_number}.')

if __name__ == '__main__':


result is
result is b
result is cc
There was a timeout for link d, trial number 7.
result is ddd
result is eeee


from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, TimeoutError
import os
import time

def main():
listOfLinks = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j']
futures = []
cpu_count = os.cpu_count()
with ThreadPoolExecutor(max_workers=cpu_count) as thread_executor, ProcessPoolExecutor(max_workers=cpu_count) as process_executor:
# pass executor to our worker
futures = [thread_executor.submit(processRunSeveralTimesInParallel, tuple, process_executor) for tuple in enumerate(listOfLinks)]
for future in futures:
result = future.result()
print('result is', result)

def processRunSeveralTimesInParallel(tuple, executor):
link_number = tuple[0]
link = tuple[1]
# long running sequence of instructions up until this point and then
# allow 2 seconds for this part:
for i in range(10):
future = executor.submit(time_critical, link, i)
future.result(timeout=2) # time_critical does not return a result other than None
except TimeoutError:
handle_exception(link, i)
return link * link_number

def time_critical(link, trial_number):
if link == 'd' and trial_number == 7:
time.sleep(3) # generate a TimeoutError

def handle_exception(link, trial_number):
print(f'There was a timeout for link {link}, trial number {trial_number}.')

if __name__ == '__main__':


result is
result is b
result is cc
There was a timeout for link d, trial number 7.
result is ddd
result is eeee
result is fffff
result is gggggg
result is hhhhhhh
result is iiiiiiii
result is jjjjjjjjj


from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Process
import os
import time

def main():
listOfLinks = ['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j']
futures = []
workers = os.cpu_count() // 2
with ProcessPoolExecutor(max_workers=workers) as process_executor:
# pass executor to our worker
futures = [process_executor.submit(processRunSeveralTimesInParallel, tuple) for tuple in enumerate(listOfLinks)]
for future in futures:
result = future.result()
print('result is', result)

def processRunSeveralTimesInParallel(tuple):
link_number = tuple[0]
link = tuple[1]
# long running sequence of instructions up until this point and then
# allow 2 seconds for this part:
for i in range(10):
p = Process(target=time_critical, args=(link, i))
p.join(timeout=2) # don't block for more than 2 seconds
if p.exitcode is None: # subprocess did not terminate
p.terminate() # we will terminate it
handle_exception(link, i)
return link * link_number

def time_critical(link, trial_number):
if link == 'd' and trial_number == 7:
time.sleep(3) # generate a TimeoutError

def handle_exception(link, trial_number):
print(f'There was a timeout for link {link}, trial number {trial_number}.')

if __name__ == '__main__':


result is
result is b
result is cc
There was a timeout for link d, trial number 7.
result is ddd
result is eeee
result is fffff
result is gggggg
result is hhhhhhh
result is iiiiiiii
result is jjjjjjjjj


  • 没有找到相关文章
