是否有更好的方法来完成我的目标,下面的代码:
- 我想从数据集A中读取基于过滤器的数据,该数据集包含许多小片段(由于我经常下载数据,因此该数据集中有许多文件)
- 我想在一个循环中整合基于分区的片段(当我不能将所有过滤器放入内存时,我使用这个,所以我一个接一个地处理它们)
- 我想将此数据写入一个新的数据集(数据集B)到一个合并文件中,该文件由我们的BI工具读取-不幸的是,没有partition_filename_cb函数,因此我需要使用遗留的write_to_dataset -此文件通常是分区的名称
- 我真的很想清理数据集A.随着时间的推移,越来越多的文件被添加到分区,因为我经常下载数据,行可以更新(其中一些片段文件只有1或两个记录)
下面是我当前的流程。我用ds。扫描器来应用我的过滤器并从原始数据集中选择我的列
def retrieve_fragments(dataset, filter_expression, columns):
"""Creates a dictionary of file fragments and filters from a pyarrow dataset"""
fragment_partitions = {}
scanner = ds.Scanner.from_dataset(dataset, columns=columns, filter=filter_expression)
fragments = scanner.get_fragments()
for frag in fragments:
keys = ds._get_partition_keys(frag.partition_expression)
fragment_partitions[frag] = keys
return fragment_partitions
下面我创建了具有相同过滤器表达式的所有片段的小列表。然后,我可以将这些写入新数据集到合并文件中,并且我假设我也可以删除单个片段文件并编写新的合并版本?
fragments = retrieve_fragments(
dataset=dataset, filter_expression=filter_expression, columns=read_columns
)
unique_filters = []
dfs = []
for fragment, filter_value in fragments.items():
if filter_value not in unique_filters:
unique_filters.append(filter_value)
#each chunk is a list of all of the fragments with the same partition_expression / filter which we turn into a new dataset that we can then process or resave into a consolidated file
for unique_filter in unique_filters:
chunks = []
for frag, filter_value in fragments.items():
if filter_value == unique_filter:
chunks.append(frag.path)
logging.info(
f"Combining {len(chunks)} fragments with filter {unique_filter} into a single table"
)
table = ds.dataset(chunks, partitioning=partitioning, filesystem=filesystem).to_table(columns=read_columns)
#ignoring metadata due to some issues with columns having a boolean type even though they were never boolean
df = table.to_pandas(ignore_metadata=True)
#this function would just sort and drop duplicates on a unique constraint key
df = prepare_dataframe(df)
table = pa.Table.from_pandas(df=df, schema=dataset.schema, preserve_index=False)
#write dataset to Dataset B (using partition_filename_cb)
#I believe I could now also write the dataset back to Dataset A in a consolidated parquet file and then delete all of the fragment.paths. This would leave me with only a single file in the partition "folder"
该命令的输出将为每个分区保存一个文件到一个新的数据集(/dev/interactions-final/created_date=2019-11-13/2019-11-13.parquet)
INFO - Combining 78 fragments with filter {'created_date': datetime.date(2019, 11, 13)} into a single table
INFO - Saving 172657 rows and 36 columns (70.36 MB to dev/interactions-final)
INFO - Combining 57 fragments with filter {'created_date': datetime.date(2019, 11, 18)} into a single table
INFO - Saving 67036 rows and 36 columns (29.63 MB to dev/interactions-final)
INFO - Combining 55 fragments with filter {'created_date': datetime.date(2019, 11, 19)} into a single table
INFO - Saving 65035 rows and 36 columns (29.62 MB to dev/interactions-final)
INFO - Combining 63 fragments with filter {'created_date': datetime.date(2019, 11, 20)} into a single table
INFO - Saving 63613 rows and 36 columns (30.76 MB to dev/interactions-final)
您尝试过write_dataset
(代码在这里)吗?它会重新分区,我认为它会在这个过程中收集小的碎片。