有没有一种方法可以让ProcessPoolExecutor在所有工作人员都很忙的时候有一个有界队列来处理传入请求?
在文档中,没有解释如果调用submit((并且所有工作人员都很忙会发生什么。然而,我做了一些研究,发现ProcessPoolExecutor有自己的内部队列,它显然是无限的。一般来说,使用未绑定队列不是一个好的做法,因为Executor可能被用来破坏系统(DoS攻击(。如果"some_function"运行时间过长,并且接收到大小较大的参数,那么类似的操作很容易导致系统崩溃。
with ProcessPoolExecutor(max_workers=5) as executor:
for arg in range(10000000000000):
future = executor.submit(some_function, args)
我想知道是否有办法限制内部队列的大小,或者让它使用外部队列?
避免篡改ProcessPoolExecutor
内部的一种更干净的方法是使用BoundedSemaphore,该信号量在每次提交任务时增加,在每次完成任务时减少。
这具有阻止提交而不是引发错误的优点。
你可以从这个要点中找到一个有效的例子。
正如我的问题ProcessPoolExecutor中所提到的,它有自己的内部队列,该队列是无边界的。但是,ProcessPoolExecutor_queue_count统计活动请求数(正在运行+挂起(。
对我来说,这是有界的,只需在ProcessPoolExecutor上创建一个包装器来检查计数器,并在数量超过所需的最大队列大小时抛出一些运行时异常:
self._max_queue_size = self._max_workers + max_queue_size
然后:
def submit(self, fn, *args, **kwargs) -> Future:
if self._executor._queue_count >= self._max_queue_size:
raise RuntimeError(
f"{self.__class__.__name__} has reached its maximum of "
f"{self._max_queue_size} active (running + queued) requests.")
return self._executor.submit(fn, *args, **kwargs)
可能不是最好或最干净的解决方案,但它确实为我做了工作。