我想将我的CSV文件转换为parquet文件。不管chunksize
参数是什么,下面的代码都会导致内核被kill。我不知道我的文件中有多少行x列,但我怀疑我有很多列。理想的解决方案是什么?
与熊猫:
import pandas as pd
import dask.dataframe as dd
import pyarrow as pa
import pyarrow.parquet as pq
csv_file = "kipan_exon.csv.gz"
parquet_file = "kipan_exon.csv.gz"
chunksize = 1000000
df = pd.read_csv(csv_file, sep="t", chunksize=chunksize, low_memory=False, compression="gzip")
for i, chunk in enumerate(df):
print("Chunk", i)
if i == 0:
parquet_schema = pa.Table.from_pandas(df=chunk).schema
parquet_writer = pd.ParquetWriter(parquet_file, parquet_schema, compression="gzip")
table = pa.Table.from_pandas(chunk, schema=parquet_schema)
parquet_writer.write_table(table)
parquet_writer.close()
与dask:
df = dd.read_csv(csv_file, sep="t", compression="gzip", blocksize=None)
df = df.repartition(partition_size="100MB")
df.to_parquet(parquet_file, write_index=False)
另一个(最近的)解决方案是在polars
中使用LazyFrame
方法:
csv_file = "kipan_exon.csv" # this doesn't work with compressed files right now
parquet_file = "kipan_exon.parquet" # @MichaelDelgado's comment re: same value as `csv_file`
from polars import scan_csv
ldf = scan_csv(csv_file)
ldf.sink_parquet(parquet_file)
这应该在内存受限的情况下工作得很好,因为数据没有完全加载,而是流式传输到parquet文件。
当使用dask
进行csv到拼花转换时,我建议避免使用.repartition
。它引入了额外的数据变换,可能会使工作程序和调度器不堪重负。更简单的方法如下所示:
csv_file = "kipan_exon.csv.gz"
parquet_file = "kipan_exon.parquet" # @MichaelDelgado's comment re: same value as `csv_file`
from dask.dataframe import read_csv
df = read_csv(csv_file, sep="t", compression="gzip")
df.to_parquet(parquet_file, write_index=False)