在pyarrow数据集中整合片段的最佳方法?



是否有更好的方法来完成我的目标,下面的代码:

  1. 我想从数据集A中读取基于过滤器的数据,该数据集包含许多小片段(由于我经常下载数据,因此该数据集中有许多文件)
  2. 我想在一个循环中整合基于分区的片段(当我不能将所有过滤器放入内存时,我使用这个,所以我一个接一个地处理它们)
  3. 我想将此数据写入一个新的数据集(数据集B)到一个合并文件中,该文件由我们的BI工具读取-不幸的是,没有partition_filename_cb函数,因此我需要使用遗留的write_to_dataset -此文件通常是分区的名称
  4. 我真的很想清理数据集A.随着时间的推移,越来越多的文件被添加到分区,因为我经常下载数据,行可以更新(其中一些片段文件只有1或两个记录)

下面是我当前的流程。我用ds。扫描器来应用我的过滤器并从原始数据集中选择我的列

def retrieve_fragments(dataset, filter_expression, columns):
    """Creates a dictionary of file fragments and filters from a pyarrow dataset"""
    fragment_partitions = {}
    scanner = ds.Scanner.from_dataset(dataset, columns=columns, filter=filter_expression)
    fragments = scanner.get_fragments()
    for frag in fragments:
        keys = ds._get_partition_keys(frag.partition_expression)
        fragment_partitions[frag] = keys
    return fragment_partitions

下面我创建了具有相同过滤器表达式的所有片段的小列表。然后,我可以将这些写入新数据集到合并文件中,并且我假设我也可以删除单个片段文件并编写新的合并版本?

fragments = retrieve_fragments(
    dataset=dataset, filter_expression=filter_expression, columns=read_columns
)
unique_filters = []
dfs = []
for fragment, filter_value in fragments.items():
    if filter_value not in unique_filters:
        unique_filters.append(filter_value)

#each chunk is a list of all of the fragments with the same partition_expression / filter which we turn into a new dataset that we can then process or resave into a consolidated file
for unique_filter in unique_filters:
    chunks = []
    for frag, filter_value in fragments.items():
        if filter_value == unique_filter:
            chunks.append(frag.path)
    logging.info(
        f"Combining {len(chunks)} fragments with filter {unique_filter} into a single table"
    )
    table = ds.dataset(chunks, partitioning=partitioning, filesystem=filesystem).to_table(columns=read_columns)

    #ignoring metadata due to some issues with columns having a boolean type even though they were never boolean
    df = table.to_pandas(ignore_metadata=True)
    #this function would just sort and drop duplicates on a unique constraint key
    df = prepare_dataframe(df)
    table = pa.Table.from_pandas(df=df, schema=dataset.schema, preserve_index=False)
    #write dataset to Dataset B (using partition_filename_cb)
    #I believe I could now also write the dataset back to Dataset A in a consolidated parquet file and then delete all of the fragment.paths. This would leave me with only a single file in the partition "folder"

该命令的输出将为每个分区保存一个文件到一个新的数据集(/dev/interactions-final/created_date=2019-11-13/2019-11-13.parquet)

INFO - Combining 78 fragments with filter {'created_date': datetime.date(2019, 11, 13)} into a single table
INFO - Saving 172657 rows and 36 columns (70.36 MB to dev/interactions-final)
INFO - Combining 57 fragments with filter {'created_date': datetime.date(2019, 11, 18)} into a single table
INFO - Saving 67036 rows and 36 columns (29.63 MB to dev/interactions-final)
INFO - Combining 55 fragments with filter {'created_date': datetime.date(2019, 11, 19)} into a single table
INFO - Saving 65035 rows and 36 columns (29.62 MB to dev/interactions-final)
INFO - Combining 63 fragments with filter {'created_date': datetime.date(2019, 11, 20)} into a single table
INFO - Saving 63613 rows and 36 columns (30.76 MB to dev/interactions-final)

您尝试过write_dataset(代码在这里)吗?它会重新分区,我认为它会在这个过程中收集小的碎片。

最新更新