reverse of itertools.groupby?



我正在为一些数据处理组合生成器。我首先批处理API调用中的线程数据生成器,如:

from itertools import groupby, count
def batch(data: List[Any], size=4):
c = count()
for _, g in groupby(data, lambda _: next(c)//size):
yield g  

,然后将其提供给线程以执行API调用

from concurrent.futures import ThreadPoolExecutor
def thread(data: Iterable, func: Callable, n=4):
with ThreadPoolExecutor(max_workers=n) as executor:
for batch in data:
yield executor.map(func, batch) 

现在我正试图将批合并回列表/生成器中,以便在生成器管道中使用下游。我试过这个

from itertools import chain
def flat_map(batches: Iterable):
for i in list(chain(batches)):
yield i

但是i似乎仍然是一个生成器而不是列表中的一个项目?

您想要chain(*batches)chain.from_iterable(batches)chain(batches)基本上只是产生与直接使用batches相同的值,它只是添加了一层包装。因此,正确的代码(没有list标记,这几乎肯定是错误的)是:

from itertools import chain
def flat_map(batches: Iterable):
return chain.from_iterable(batches)  # chain(*batches) would also work, but if batches is an iterator itself, it would be forced to eagerly run to completion first; chain.from_iterable can begin work when the first batch is ready

你甚至不需要yield,因为迭代器已经产生了你想要的结果。如果您需要它成为一个真正的生成器,只需将return替换为yield from即可获得类似的结果。

还要注意:您可以通过更改

来完全避免使用该函数。
yield executor.map(func, batch) 

:

yield from executor.map(func, batch) 

所以thread一开始就变平了。

所以我最终将三个函数浓缩为一个:

from itertools import chain, groupby
from concurrent.futures import ThreadPoolExecutor
def spread(data: Iterable, func: Callable, n=4):
""" Combines `batch`, `thread` and `flat_map`"""
c = count()
with ThreadPoolExecutor(max_workers=n) as executor:
for _, batch in groupby(data, lambda _: next(c)//n):
yield from executor.map(func, batch)

所以我只需要yield from来让这个工作。谢谢@ShadowRanger !

最新更新