如何在Dask read_parquet函数中过滤不同的分区



我遇到了从镶木地板文件加载dask数据帧的问题。

基本上,我将镶木地板文件分为以下几类:飞机名称(aircraft=名称_飞机(、渐进号(识别飞机每个任务的数字:渐进号=数字(、年、月和日。

当我试图将镶木地板文件读取到dask数据帧中时,我成功地过滤了年份窗口和渐进窗口,但未能仅选择一些飞机。这里报告了我用来读取拼花地板文件的功能

ddf = dd.read_parquet(path, engine="pyarrow", index=False, filters=filters)

其中path是文件的正确路径。parquet和filter是一个元组列表,其中包含我想要过滤的元素,例如:

filters = [('PROGRESSIVE', '>=', 0), ('PROGRESSIVE', '<=', 999), ('year', '>', 1999), ('year', '<', 2021), ('AIRCRAFT', '=', 'Aircraft-5')]

现在有了这种过滤器,一切都可以了,但如果我想选择多架飞机,或者,例如,不在同一范围窗口内的不同渐进号(比如说,仅753、800和883(,我无法正确加载数据帧。

例如,如果我设置

filters = [('PROGRESSIVE', '>=', 0), ('PROGRESSIVE', '<=', 999), ('year', '>', 1999), ('year', '<', 2021), ('AIRCRAFT', '=', 'Aircraft-4') ('AIRCRAFT', '=','Aircraft-5')]

那么加载的数据帧实际上是空的:len(ddf_filtered_demo.index(是0,而只选择一架飞机不是空的并且是正确的。

问题是我可以选择一系列值(<或>(,但不能只选择某些元素。

从拼花地板文件加载dask数据帧的正确方法是什么?只选择不属于唯一值范围的分区?

fastparquet接口支持in,因此您可以执行

filters = [('PROGRESSIVE', 'in', [753, 80, 883]), ... )

我不知道arrow是否支持这种语法,你可以试试。

对你来说,这个特定的不起作用的例子听起来像是一个bug,你应该报告它。理想情况下,你可以用你在代码中创建的最小示例数据集来重新创建它。

下面是一个简单的例子,它适用于当前的fastparquet主分支(我碰巧在本地安装了它(。

# make data
df = pd.DataFrame({'a': [1, 2, 3, 4], 'b': list("abcd")})
df.to_parquet("out.parq", partition_on='a', engine="fastparquet")
# read
pd.read_parquet("out.parq", filters=[('a', 'in', [1, 2, 4])], engine='fastparquet')

由于OP的问题使用了pyarrow:请注意,关于dd.read_parquetfilters夸尔格的Dask文档说:

"要应用的筛选器列表,如[[('Cl1','==',0(,…],…]。使用此参数不会导致对最终分区的逐行筛选,除非engine="pyarrow数据集";也被指定。对于其他引擎,过滤仅在分区级别执行,即防止加载某些行组和/或文件">

换句话说,当使用非pyarrow引擎时,filters无法拉出满足筛选条件的特定行,除非拼花地板文件是用正确的分区键创建的(在上述答案的情况下为partition_on=a(。

因此,尽管此代码适用于fastparquetpyarrow引擎:

import dask.dataframe as dd
import pandas as pd 
df = pd.DataFrame(
{"letter": ["a", "b", "c", "a", "a", "d"], "number": [1, 2, 3, 4, 5, 6]}
)
ddf = dd.from_pandas(df, npartitions=3)
ddf.to_parquet("tmp/partition_on.parquet", engine="fastparquet", partition_on="letter")
# write dummy df to parquet using partition_on kwarg
ddf_1 = dd.read_parquet(
"tmp/partition/1", engine="fastparquet", filters=[("letter", "==", "a")]
)

此代码仅适用于pyarrow:

# write dummy df to parquet without partition_on kwarg
ddf.to_parquet("tmp/without_partition_on.parquet", engine="pyarrow")
ddf_2 = dd.read_parquet(
"tmp/non_partitioned_on.parquet", engine="pyarrow", filters=[("letter", "==", "a"), ("number", ">=", 2)]
)

上面的例子是OP代码的MCVE版本。这向我表明,要么你的数据质量出了问题,要么你应该报告一个错误。

将mdurant中的示例修改为特定于dask将是:

import dask.dataframe as dd
import pandas as pd
# make data
df = dd.from_pandas(pd.DataFrame({'a': [1, 2, 3, 4], 'b': list("abcd")}), npartitions=1)
df.to_parquet("out.parq", partition_on='a', engine="fastparquet") 
# read
dd.read_parquet("out.parq", filters=[('a', 'in', (1, 2, 4))], engine='fastparquet').compute()

看起来dask会对过滤器(链接(进行扁平化,因此如果过滤器中有一个列表(当_flatten_filter()函数试图构建一个集合时会发生这种情况(,则会出现类型错误。但是,如果您将值作为元组而不是列表提供,则过滤器可以正常工作。

最新更新