我怎么去转换.csv到.箭头文件没有加载到内存?



我发现了一个类似的问题:使用PyArrow读取CSV

在这个答案中,它引用了sys.stdin.buffer和sys.stdout。缓冲区,但我不完全确定如何使用它来编写.arrow文件或命名它。我似乎在pyarrow的文档中找不到我要找的确切信息。我的文件将没有任何名称,但它将有一个带时间戳的索引。该文件约为100gb,因此无法将其加载到内存中。我尝试更改代码,但正如我所设想的那样,代码最终会在每次循环时覆盖前一个文件。

***这是我的第一篇文章。我要感谢所有的贡献者,他们在我问他们之前就回答了我99.9%的其他问题。

import sys
import pandas as pd
import pyarrow as pa
SPLIT_ROWS = 1     ### used one line chunks for a small test
def main():
writer = None
for split in pd.read_csv(sys.stdin.buffer, chunksize=SPLIT_ROWS):
table = pa.Table.from_pandas(split)
# Write out to file
with pa.OSFile('test.arrow', 'wb') as sink:     ### no append mode yet
with pa.RecordBatchFileWriter(sink, table.schema) as writer:
writer.write_table(table)
writer.close()
if __name__ == "__main__":
main()
下面是我在命令行 中使用的代码
>cat data.csv | python test.py

根据@Pace的建议,您应该考虑将输出文件创建移到读取循环之外。像这样:

import sys
import pandas as pd
import pyarrow as pa
SPLIT_ROWS = 1     ### used one line chunks for a small test
def main():
# Write out to file
with pa.OSFile('test.arrow', 'wb') as sink:     ### no append mode yet
with pa.RecordBatchFileWriter(sink, table.schema) as writer:
for split in pd.read_csv('data.csv', chunksize=SPLIT_ROWS):
table = pa.Table.from_pandas(split)
writer.write_table(table)
if __name__ == "__main__":
main()        

如果您希望指定特定的输入和输出文件,也不必使用sys.stdin.buffer。然后,您可以以如下方式运行脚本:

python test.py

通过使用with语句,之后writersink都将自动关闭(在本例中,当main()返回时)。这意味着不需要包含显式的close()调用。

改编自@Martin-Evans代码的解决方案:

按照@Pace

的建议关闭for循环后的文件
import sys
import pandas as pd
import pyarrow as pa
SPLIT_ROWS = 1000000
def main():
schema = pa.Table.from_pandas(pd.read_csv('Data.csv',nrows=2)).schema 
### reads first two lines to define schema 
with pa.OSFile('test.arrow', 'wb') as sink:
with pa.RecordBatchFileWriter(sink, schema) as writer:            
for split in pd.read_csv('Data.csv',chunksize=SPLIT_ROWS):
table = pa.Table.from_pandas(split)
writer.write_table(table)
writer.close()
if __name__ == "__main__":
main()   

在2023年,你不需要熊猫。您可以使用arrow:

对csv进行分组
import pyarrow as pa
from pyarrow import csv
schema =  pa.schema([
('time', pa.timestamp('ms', None)),
('deviceid', pa.utf8())
])
convert_dict = {
'time': pa.timestamp('ms', None),
'deviceid': pa.utf8()
}
convert_options = pa.csv.ConvertOptions(
column_types=convert_dict
, strings_can_be_null=True
, quoted_strings_can_be_null=True
,timestamp_parsers=["%Y-%m-%d %H:%M:%S"],
)
arrowfile = "data_dst.arrow"
csvfile = "data_src.csv"
with pa.OSFile(arrowfile, 'wb') as sink:     ### no append mode yet
with pa.csv.open_csv(csvfile, convert_options=convert_options) as reader:
with pa.RecordBatchFileWriter(sink, schema) as writer:
for next_chunk in reader:
if next_chunk is None:
break
if writer is None:
break
next_table = pa.Table.from_batches([next_chunk])
writer.write_table(next_table)

相关内容

  • 没有找到相关文章

最新更新