我使用concurrent.futures实现了一个进程池。ProcessPoolExecutor,但是我注意到,当我打印出pool。_queue_count每次我向池提交一个新的工作项时,它都会不断增长。它为什么要这样做,这会成为一个问题吗?
下面是我当前记录的输出:
2022-12-06 15:37:31,934 - DEBUG | Running automation 'xxx' with internal automation id 'xxx'
2022-12-06 15:37:31,934 - DEBUG | Running automation 'xxx' with internal automation id 'xxx'
2022-12-06 15:37:31,935 - DEBUG | Running automation 'xxx' with internal automation id 'xxx'
2022-12-06 15:37:31,935 - DEBUG | Pool queue size: 329
2022-12-06 15:37:31,935 - DEBUG | Pool processes: {19113: <ForkProcess name='ForkProcess-2' pid=19113 parent=19104 started>, 19114: <ForkProcess name='ForkProcess-3' pid=19114 parent=19104 started>}
2022-12-06 15:37:31,935 - DEBUG | Pool pending work: {328: <concurrent.futures.process._WorkItem object at 0x7f247f7be2e0>}
2022-12-06 15:37:41,946 - DEBUG | Running automation 'xxx' with internal automation id 'xxx'
2022-12-06 15:37:41,946 - DEBUG | Running automation 'xxx' with internal automation id 'xxx'
2022-12-06 15:37:41,946 - DEBUG | Running automation 'xxx' with internal automation id 'xxx'
2022-12-06 15:37:41,947 - DEBUG | Pool queue size: 330
2022-12-06 15:37:41,947 - DEBUG | Pool processes: {19113: <ForkProcess name='ForkProcess-2' pid=19113 parent=19104 started>, 19114: <ForkProcess name='ForkProcess-3' pid=19114 parent=19104 started>}
2022-12-06 15:37:41,947 - DEBUG | Pool pending work: {329: <concurrent.futures.process._WorkItem object at 0x7f247f7be6a0>}
注意,池队列大小现在报告330 -但我不明白这是什么意思,也不明白为什么它这么高。由于某种原因,它每次都将大小增加1。
我不能粘贴所有的代码,因为有很多,但这里是一个稍微精简的版本,一些我觉得不相关的代码片段被删掉了:
futures = []
with mp.Manager() as manager:
last_execution = time.perf_counter()
pool = ProcessPoolExecutor()
while True:
current_time = time.perf_counter()
if current_time - last_execution < 10 and not first_run:
time.sleep(1)
else:
last_execution = current_time
for automation_file in automation_files:
with open(automation_file, "r") as f:
automation_config = json.load(f)
automation_name = os.path.splitext(os.path.basename(automation_file))[0]
automation_log = os.path.join(log_dir, f"{automation_name}.log")
automation_type = automation_config["type"]
if automation_type == "task":
automation = pyba.AutomationTask(automation_name, automation_config, automation_log, api_1, api_2)
else:
logger.error(f"Unknown automation type in '{os.path.basename(automation_file)}', skipping")
continue
logger.debug(f"Running automation '{automation.name}' with internal automation id '{automation._id}'")
future = pool.submit(automation.run, args=(session_1, session_2, stop_app_event))
futures.append(future)
logger.debug(f"Pool queue size: {pool._queue_count}")
logger.debug(f"Pool processes: {pool._processes}")
logger.debug(f"Pool pending work: {pool._pending_work_items}")
基本上,我们得到一堆自动化文件,解析它们,然后在使用进程池的新进程中运行它们。然后我们等待一个给定的间隔(这里的测试为10秒),并再次执行完全相同的操作。
然而,现在没有什么可以让这些自动化过程实际处理,因为我在测试中,没有为它创建任何测试记录…所以我看不出队列的大小会随着时间的推移而变得这么大。
我的测试服务器上的CPU的数量是2 -所以应该只有两个进程在池中?
我不认为内存或CPU在这里是一个问题:
-bash-4.2$ ps aux | head -1; ps aux | grep -iE 'python3.9|19104' | grep -v grep | sort -rnk 4
USER PID %CPU %MEM VSZ RSS TTY STAT START TIME COMMAND
admin 19114 0.0 0.0 225584 15648 pts/1 S+ 14:42 0:00 python3.9 app.py
admin 19113 0.0 0.0 225584 15612 pts/1 S+ 14:42 0:00 python3.9 app.py
admin 19107 0.0 0.0 520492 15376 pts/1 Sl+ 14:42 0:01 python3.9 app.py
admin 19104 0.0 0.0 374080 20248 pts/1 Sl+ 14:42 0:02 python3.9 app.py
最后要提到的是,我已经使用信号实现了一个优雅的停止解决方案。当我向应用程序发送一个停止信号时,它几乎立即优雅地停止了—这表明尽管有这么大的队列计数,但它没有进行任何处理。这确实增加了混淆:
2022-12-06 16:16:05,505 - DEBUG | Pool queue size: 560
2022-12-06 16:16:05,506 - DEBUG | Pool processes: {19113: <ForkProcess name='ForkProcess-2' pid=19113 parent=19104 started>, 19114: <ForkProcess name='ForkProcess-3' pid=19114 parent=19104 started>}
2022-12-06 16:16:05,506 - DEBUG | Pool pending work: {559: <concurrent.futures.process._WorkItem object at 0x7f247f738160>}
2022-12-06 16:16:12,516 - DEBUG | Received a signal to stop the app, setting the stop flag
2022-12-06 16:16:12,516 - DEBUG | Cancelling all scheduled pending work
2022-12-06 16:16:12,518 - DEBUG | Shutting down the process pool
2022-12-06 16:16:12,522 - DEBUG | Process pool shut down successfully, app stopped
_queue_count
只是一个顺序的工作项ID,它永远不会减少。
无论如何,您都不应该手动读取它(这就是其名称中的前缀下划线的含义!)。