我有一个多进程池在运行,然后创建线程。多进程池使用多进程管理器字典,以便在进程之间共享字典。也就是说,在创建每个流程时,它都会更新字典,将其流程id作为关键字,并将其分配给该流程的值(即它需要照顾的资产(。
在某些情况下,进程可能会失败或被终止。当这种情况发生时,我将当前运行的系统进程号与原始字典的进程号进行比较。如果缺少任何进程,那么我启动一个新进程,它将其id分配给共享字典,并从字典中删除旧进程id。
我遇到的问题是字典似乎没有在线程中更新。也就是说,如果线程是在创建进程之后创建的,那么字典是可以的,它将接收完成的字典。如果创建了一个新进程(由于进程失败,从线程内部(,则字典不会更新(我在设置进程的函数内部更新字典(
因此,即使字典是一个多进程管理器字典,它也只在进程之间共享,而不在线程中共享。
有人遇到过这种情况吗?或者知道与创建池后建立的线程共享更新的字典的解决方案吗?
简化代码如下:
mgr = multiprocessing.Manager()
process_dict = mgr.dict()
pool = Pool(processes=round(5))
for x in range(6):
processes = pool.apply_async(activateMainProgram, args=(process_dict, asset1,))
#here all the processes are created and they update the dictionary with their ID's
loggerthread = Thread(target=processactivecheck, args=(process_dict))
#thread is created to monitor if any process failed
loggerthread.daemon = True
loggerthread.start()
def processactivecheck(process_dict):
from datetime import datetime
duration = datetime.now() - currentpingyoualiveprocesses
duration_in_s = duration.total_seconds()
if duration_in_s >= 300: #run every 5 minutes
currentpingyoualiveprocesses = datetime.now()
process_dict_list = [eval(i) for i in process_dict.keys()] #get the active process dictionary
allPyIds = [p.pid for p in psutil.process_iter() if "python" in str(p.name)] #get all python processes running on system
inactiveprocesses = [i for i in process_dict_list if i not in allPyIds] #identify processes that are not running
log.info("Current Active Processes : " + str(allPyIds))
log.info("Processes that are required : " + str(process_dict_list))
if (len(inactiveprocesses) >0):
log.info("Processes that are missing : " + str(inactiveprocesses))
if (len(inactiveprocesses) >0):
log.error("The following processes have stopped : " + str(inactiveprocesses) + ", restarting")
for item in inactiveprocesses:
#activating fallen process
log.error("Process " + str(item) + " has stopped working, reactivating ")
item = str(item)
log.error("Reactivating for asset : " + str(process_dict[item][0]))
processes = pool.apply_async(activateMainProgram, args=(process_dict, process_dict[item][0]))
#here the new process is created (to replace the failed process) and is assigned the asset to work on
process_camera_dict.pop(item, None)
#here we remove the Old, now dead process from the list
#ISSUE: The new created process updates the shared dictionary but this change is NOT visible in the Thread (The thread only sees the OLD, original dictionary).
time.sleep(5)
inactiveprocesses = []
我认为您正在使事情复杂化——相反,只需使用multiprocessing.Process
和一个循环。。。
import multiprocessing
import os
import random
import secrets
import sys
import time
def watch_asset(asset):
print(os.getpid(), "watching", asset)
while True:
if random.random() < 0.1:
print(os.getpid(), "gack! falling down dramatically oh no")
sys.exit(42)
time.sleep(0.1)
def main():
assets = [secrets.token_hex(8) for i in range(5)]
watchers = {}
try:
while True:
for asset in assets:
if asset not in watchers:
watchers[asset] = multiprocessing.Process(target=watch_asset, args=(asset,))
watchers[asset].start()
print("Started watcher", watchers[asset].pid, "for", asset)
continue
if not watchers[asset].is_alive():
print("Watcher", watchers[asset].pid, "for", asset, "died, restarting soon")
watchers.pop(asset)
time.sleep(1)
finally:
print("terminating watchers")
for watcher in watchers.values():
watcher.terminate()
for watcher in watchers.values():
watcher.join(timeout=5)
if __name__ == '__main__':
main()
这会打印出来,例如
Started watcher 11066 for 6246bcbc0a8a26ea
Started watcher 11067 for 52b4d6e77982f178
Started watcher 11068 for ed0a9ffb49fc020c
Started watcher 11069 for 6ac711ec1d4042e3
Started watcher 11070 for 912a4f54e4a5f2d8
11066 watching 6246bcbc0a8a26ea
11067 watching 52b4d6e77982f178
11069 watching 6ac711ec1d4042e3
11068 watching ed0a9ffb49fc020c
11070 watching 912a4f54e4a5f2d8
11066 gack! falling down dramatically oh no
11070 gack! falling down dramatically oh no
Watcher 11066 for 6246bcbc0a8a26ea died, restarting soon
Watcher 11070 for 912a4f54e4a5f2d8 died, restarting soon
11068 gack! falling down dramatically oh no
11067 gack! falling down dramatically oh no
Started watcher 11077 for 6246bcbc0a8a26ea
Watcher 11067 for 52b4d6e77982f178 died, restarting soon
Watcher 11068 for ed0a9ffb49fc020c died, restarting soon