我有大量的任务想要执行,并通过生成器提供结果。但是,使用ProcessPoolExecutor
和as_completed
将贪婪地评估结果并将它们全部存储在内存中。有没有办法在生成器中存储一定数量的结果后阻止?
这样做的想法是将要处理的内容拆分为块,我将使用与ProcessPoolExecutor
文档中几乎相同的示例:
import concurrent.futures
import math
import itertools as it
PRIMES = [
293,
171,
293,
773,
99,
5419,
293,
171,
293,
773,
99,
5419,
293,
171,
293,
773,
99,
5419]
def is_prime(n):
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def main():
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))
def main_lazy():
chunks = map(lambda x: it.islice(PRIMES, x, x+4), range(0, len(PRIMES), 4))
with concurrent.futures.ProcessPoolExecutor() as executor:
results = zip(PRIMES,
it.chain.from_iterable(map(lambda x: executor.map(is_prime, x),
chunks)))
for number, prime in (next(results) for _ in range(4)):
print('%d is prime: %s' % (number, prime))
if __name__ == "__main__":
main_lazy()
注意main
和main_lazy
之间的差异,让我们稍微解释一下:
我没有列出我们想要处理的所有内容的列表,而是将其拆分为大小为 4 的块(使用itertools.islice
很有用),这个想法是,我们将映射块,而不是与执行器映射整个列表。然后,只需使用 python3 惰性map
,我们就可以将该执行器调用延迟映射到每个块。因此,我们知道executor.map
并不懒惰,因此当我们请求块时,它将立即对其进行评估,但是直到我们不请求其他块之前,将不会调用该块的executor.map
。 如您所见,我只请求整个结果列表中的前 4 个元素,但由于我也使用了itertools.chain
它只会消耗第一个块中的元素,而不计算可迭代对象的其余元素。
因此,由于您想返回生成器,因此就像从main_lazy
函数返回结果一样简单,您甚至可以抽象块大小(可能您需要一个好的函数来获取 propper 块,但这超出了范围):
def main_lazy(chunk_size):
chunks = map(lambda x: it.islice(PRIMES, x, x+chunk_size), range(0, len(PRIMES), chunk_size))
with concurrent.futures.ProcessPoolExecutor() as executor:
results = zip(PRIMES,
it.chain.from_iterable(map(lambda x: executor.map(is_prime, x),
chunks)))
return results
我写了一个小要点,它实现了所需的功能,而不会因使用批处理而降低性能。
用法如下:
def work(inp: In) -> Out: ...
with ProcessPoolExecutor() as ex:
# also works with ThreadPoolExecutor
for out in lazy_executor_map(work_fn, inputs_iterable, ex):
...
以及实现本身:
from concurrent.futures import Executor, Future, wait, FIRST_COMPLETED
from typing import Callable, Iterable, Iterator, TypeVar
from typing_extensions import TypeVar
In = TypeVar("In")
Out = TypeVar("Out")
def lazy_executor_map(
fn: Callable[[In], Out],
it: Iterable[In],
ex: Executor,
# may want this to be equal to the n_threads/n_processes
n_concurrent: int = 6
) -> Iterator[Out]:
queue: list[Future[Out]] = []
in_progress: set[Future[Out]] = set()
itr = iter(it)
try:
while True:
for _ in range(n_concurrent - len(in_progress)):
el = next(itr) # this line will raise StopIteration when finished
# - which will get caught by the try: except: below
fut = ex.submit(fn, el)
queue.append(fut)
in_progress.add(fut)
_, in_progress = wait(in_progress, return_when=FIRST_COMPLETED)
# iterate over the queue, yielding outputs if available in the order they came in with
while queue and queue[0].done():
yield queue.pop(0).result()
except StopIteration:
wait(queue)
for fut in queue:
yield fut.result()
尚未与批处理版本进行比较,但它似乎性能良好。