从DB2中读取一个大表(有些表有1亿个块(后,我使用itertools.isice将生成器对象转换为迭代器。我将迭代器传递给multiprocessing pool.map,后者调用一个函数将这些块并行提取到CSV。
它可以工作,但在并行运行开始之前,python pool.map会将ITERATOR转换为LIST,这会消耗大量时间。有没有一种方法可以避免创建列表或更快地转换为列表?我也尝试过使用POOL.IMAP,但当我运行该程序时,我的笔记本内核就死了。要使用IMAP,我必须将迭代器转换为一个列表,这需要一段时间。有什么想法吗?
generator_df = pd.read_sql(query2, test_connection_forbankcv_connection, chunksize = 5000)
iterable_slice = list(it.islice(generator_df, slice_start,slice_end))
results = p.imap(chunk_to_csv, iterable_slice, 1)
我马上承认,这个解决方案有一些问题,但它展示了基本思想:
import itertools
from typing import Iterable
from multiprocessing import Pool
class Lengthed_ISlice:
def __init__(self, iterable: Iterable, start: int, stop: int):
self._start = start
self._stop = stop
self._islice = itertools.islice(iterable, self._start, self._stop)
def __len__(self):
return self._stop - self._start
def __iter__(self):
return iter(self._islice)
这是islice
对象上的一个瘦包装,它实现了所需的__len__
方法,以便与Pool
的map
方法一起工作:
def double(n):
return n * 2
my_list = list(range(10, 100))
with Pool() as p:
print(p.map(double, Lengthed_ISlice(my_list, 2, 9)))
# Prints [24, 26, 28, 30, 32, 34, 36]
主要问题:
- 除了
__iter__
之外,它没有正确地将任何功能委托给底层islice
。如果您在扩展此方法的使用范围时遇到有关缺少方法的错误,则需要实现正确的方法 - 为了简洁起见,我没有考虑步骤,因为您没有使用非默认步骤,而且它们会使数学稍微复杂一些
- 我并不担心使用
Iterable
的泛型参数。如果您想要更好的类型提示,那么应该为构造函数参数和__iter__
引入TypeVar