我想做什么
我使用PyArrow来读取一些csv并将它们转换为Parquet。我读取的一些文件有很多列,并且内存占用很高(足以使运行作业的机器崩溃)。我试图通过块通过文件,同时以类似的方式读取CSV的熊猫read_csv与chunksize
的工作方式。
例如,pandas中的分块代码是这样工作的:
chunks = pandas.read_csv(data, chunksize=100, iterator=True)
# Iterate through chunks
for chunk in chunks:
do_stuff(chunk)
我想移植一个类似的功能到箭头
我想做什么
我注意到箭头有ReadOptions,其中包括一个block_size
参数,我想也许我可以这样使用它:
# Reading in-memory csv file
arrow_table = arrow_csv.read_csv(
input_file=input_buffer,
read_options=arrow_csv.ReadOptions(
use_threads=True,
block_size=4096
)
)
# Iterate through batches
for batch in arrow_table.to_batches():
do_stuff(batch)
由于这个(block_size
)似乎没有返回迭代器,我的印象是,这仍然会使Arrow读取内存中的整个表,从而重现我的问题。
最后,我知道我可以首先使用Pandas读取csv并通过它进行块处理,然后将其转换为Arrow表。但是我尽量避免使用Pandas,而只使用Arrow。
如有需要,我很乐意提供更多信息
您正在寻找的函数是pyarrow.csv.open_csv
,它返回pyarrow.csv.CSVStreamingReader
。批的大小将由您注意到的block_size
选项控制。完整示例:
import pyarrow as pa
import pyarrow.parquet as pq
import pyarrow.csv
in_path = '/home/pace/dev/benchmarks-proj/benchmarks/data/nyctaxi_2010-01.csv.gz'
out_path = '/home/pace/dev/benchmarks-proj/benchmarks/data/temp/iterative.parquet'
convert_options = pyarrow.csv.ConvertOptions()
convert_options.column_types = {
'rate_code': pa.utf8(),
'store_and_fwd_flag': pa.utf8()
}
writer = None
with pyarrow.csv.open_csv(in_path, convert_options=convert_options) as reader:
for next_chunk in reader:
if next_chunk is None:
break
if writer is None:
writer = pq.ParquetWriter(out_path, next_chunk.schema)
next_table = pa.Table.from_batches([next_chunk])
writer.write_table(next_table)
writer.close()
这个例子还突出了流CSV阅读器带来的挑战之一。它需要返回具有一致数据类型的批。但是,在解析CSV时,通常需要推断数据类型。在我的示例数据中,文件的前几MB具有rate_code
列的整数值。在批处理中间的某个地方,该列有一个非整数值(在本例中为*
)。要解决这个问题,您可以预先指定列的类型,就像我在这里所做的那样。