目录globing与分区磁盘read_parquet目录



我有一个分区气象站读数目录,我用pandas/pyarrow写的。

c.to_parquet(path=f"data/{filename}.parquet", engine='pyarrow', compression='snappy', partition_cols=['STATION', 'ELEMENT'])

当我尝试用glob和谓词下推子句读取少量文件时,如以下

ddf= dd.read_parquet("data/*.parquet", engine='pyarrow', gather_statistics=True, filters=[('STATION', '==', 'CA008202251'), ('ELEMENT', '==', 'TAVG')], columns=['TIME','ELEMENT','VALUE', 'STATION'])

我得到一个索引错误

IndexError                                Traceback (most recent call last)
<timed exec> in <module>
/usr/local/lib/python3.9/site-packages/dask/dataframe/io/parquet/core.py in read_parquet(path, columns, filters, categories, index, storage_options, engine, gather_statistics, split_row_groups, read_from_paths, chunksize, aggregate_files, **kwargs)
314         gather_statistics = True
315 
--> 316     read_metadata_result = engine.read_metadata(
317         fs,
318         paths,
/usr/local/lib/python3.9/site-packages/dask/dataframe/io/parquet/arrow.py in read_metadata(cls, fs, paths, categories, index, gather_statistics, filters, split_row_groups, read_from_paths, chunksize, aggregate_files, **kwargs)
540             split_row_groups,
541             gather_statistics,
--> 542         ) = cls._gather_metadata(
543             paths,
544             fs,
/usr/local/lib/python3.9/site-packages/dask/dataframe/io/parquet/arrow.py in _gather_metadata(cls, paths, fs, split_row_groups, gather_statistics, filters, index, dataset_kwargs)
1786 
1787         # Step 1: Create a ParquetDataset object
-> 1788         dataset, base, fns = _get_dataset_object(paths, fs, filters, dataset_kwargs)
1789         if fns == [None]:
1790             # This is a single file. No danger in gathering statistics
/usr/local/lib/python3.9/site-packages/dask/dataframe/io/parquet/arrow.py in _get_dataset_object(paths, fs, filters, dataset_kwargs)
1740         if proxy_metadata:
1741             dataset.metadata = proxy_metadata
-> 1742     elif fs.isdir(paths[0]):
1743         # This is a directory.  We can let pyarrow do its thing.
1744         # Note: In the future, it may be best to avoid listing the
IndexError: list index out of range

我可以单独加载parquet目录

ddf= dd.read_parquet("data/2000.parquet", engine='pyarrow', gather_statistics=True, filters=[('STATION', '==', 'CA008202251'), ('ELEMENT', '==', 'TAVG')], columns=['TIME','ELEMENT','VALUE', 'STATION'])

是否可以与dask/parquet/pyarrow读取globbing ?

当在.to_parquet中使用partition_cols时,分区的数据帧保存在单独的文件中,因此在您的情况下data/2000.parquet可能是一个文件夹。

import pandas as pd
from os.path import isdir
# test dataframe
df = pd.DataFrame(range(3), columns=['a'])
df['b'] = df['a']
df['c'] = df['a']
# save without partitioning
df.to_parquet('test.parquet')
print(isdir('test.parquet')) # False
# save with partitioning
df.to_parquet('test_partitioned.parquet', partition_cols=['a', 'b'])
print(isdir('test_partitioned.parquet')) # True

作为一种解决方法,使用os.walkglob构造一个显式的拼花文件列表可能是一个很好的解决方案。请注意,如果有多个分区列,那么将有多个嵌套文件夹,其中包含parquet文件,因此简单的glob是不够的,您将需要进行递归搜索。

或者,可以为每一年构造dask.dataframes,然后将它们与dd.concat连接。

"data/*.parquet"部分可能是导致您出现问题的原因。您需要提供没有*的分区湖的根路径。

下面是一个工作的示例代码片段:

df = pd.DataFrame(
[
["north america", "mexico", "carlos"],
["asia", "india", "ram"],
["asia", "china", "li"],
],
columns=["continent", "country", "first_name"],
)
ddf = dd.from_pandas(df, npartitions=2)
ddf.to_parquet(
"tmp/partition/2", engine="pyarrow", partition_on=["continent", "country"]
)
ddf = dd.read_parquet(
"tmp/partition/2",
engine="pyarrow",
filters=[("continent", "==", "asia"), ("country", "==", "china")],
)

注意read_parquet是在"tmp/partition/2"上被调用的,而不是一个带有星号的目录。

相关内容

  • 没有找到相关文章

最新更新