我遇到了从镶木地板文件加载dask数据帧的问题。
基本上,我将镶木地板文件分为以下几类:飞机名称(aircraft=名称_飞机(、渐进号(识别飞机每个任务的数字:渐进号=数字(、年、月和日。
当我试图将镶木地板文件读取到dask数据帧中时,我成功地过滤了年份窗口和渐进窗口,但未能仅选择一些飞机。这里报告了我用来读取拼花地板文件的功能
ddf = dd.read_parquet(path, engine="pyarrow", index=False, filters=filters)
其中path是文件的正确路径。parquet和filter是一个元组列表,其中包含我想要过滤的元素,例如:
filters = [('PROGRESSIVE', '>=', 0), ('PROGRESSIVE', '<=', 999), ('year', '>', 1999), ('year', '<', 2021), ('AIRCRAFT', '=', 'Aircraft-5')]
现在有了这种过滤器,一切都可以了,但如果我想选择多架飞机,或者,例如,不在同一范围窗口内的不同渐进号(比如说,仅753、800和883(,我无法正确加载数据帧。
例如,如果我设置
filters = [('PROGRESSIVE', '>=', 0), ('PROGRESSIVE', '<=', 999), ('year', '>', 1999), ('year', '<', 2021), ('AIRCRAFT', '=', 'Aircraft-4') ('AIRCRAFT', '=','Aircraft-5')]
那么加载的数据帧实际上是空的:len(ddf_filtered_demo.index(是0,而只选择一架飞机不是空的并且是正确的。
问题是我可以选择一系列值(<或>(,但不能只选择某些元素。
从拼花地板文件加载dask数据帧的正确方法是什么?只选择不属于唯一值范围的分区?
fastparquet接口支持in
,因此您可以执行
filters = [('PROGRESSIVE', 'in', [753, 80, 883]), ... )
我不知道arrow是否支持这种语法,你可以试试。
对你来说,这个特定的不起作用的例子听起来像是一个bug,你应该报告它。理想情况下,你可以用你在代码中创建的最小示例数据集来重新创建它。
下面是一个简单的例子,它适用于当前的fastparquet主分支(我碰巧在本地安装了它(。
# make data
df = pd.DataFrame({'a': [1, 2, 3, 4], 'b': list("abcd")})
df.to_parquet("out.parq", partition_on='a', engine="fastparquet")
# read
pd.read_parquet("out.parq", filters=[('a', 'in', [1, 2, 4])], engine='fastparquet')
由于OP的问题使用了pyarrow
:请注意,关于dd.read_parquet
的filters
夸尔格的Dask文档说:
"要应用的筛选器列表,如[[('Cl1','==',0(,…],…]。使用此参数不会导致对最终分区的逐行筛选,除非engine="pyarrow数据集";也被指定。对于其他引擎,过滤仅在分区级别执行,即防止加载某些行组和/或文件">
换句话说,当使用非pyarrow引擎时,filters
无法拉出满足筛选条件的特定行,除非拼花地板文件是用正确的分区键创建的(在上述答案的情况下为partition_on=a
(。
因此,尽管此代码适用于fastparquet
和pyarrow
引擎:
import dask.dataframe as dd
import pandas as pd
df = pd.DataFrame(
{"letter": ["a", "b", "c", "a", "a", "d"], "number": [1, 2, 3, 4, 5, 6]}
)
ddf = dd.from_pandas(df, npartitions=3)
ddf.to_parquet("tmp/partition_on.parquet", engine="fastparquet", partition_on="letter")
# write dummy df to parquet using partition_on kwarg
ddf_1 = dd.read_parquet(
"tmp/partition/1", engine="fastparquet", filters=[("letter", "==", "a")]
)
此代码仅适用于pyarrow
:
# write dummy df to parquet without partition_on kwarg
ddf.to_parquet("tmp/without_partition_on.parquet", engine="pyarrow")
ddf_2 = dd.read_parquet(
"tmp/non_partitioned_on.parquet", engine="pyarrow", filters=[("letter", "==", "a"), ("number", ">=", 2)]
)
上面的例子是OP代码的MCVE版本。这向我表明,要么你的数据质量出了问题,要么你应该报告一个错误。
将mdurant中的示例修改为特定于dask将是:
import dask.dataframe as dd
import pandas as pd
# make data
df = dd.from_pandas(pd.DataFrame({'a': [1, 2, 3, 4], 'b': list("abcd")}), npartitions=1)
df.to_parquet("out.parq", partition_on='a', engine="fastparquet")
# read
dd.read_parquet("out.parq", filters=[('a', 'in', (1, 2, 4))], engine='fastparquet').compute()
看起来dask会对过滤器(链接(进行扁平化,因此如果过滤器中有一个列表(当_flatten_filter()
函数试图构建一个集合时会发生这种情况(,则会出现类型错误。但是,如果您将值作为元组而不是列表提供,则过滤器可以正常工作。