减少使用聚合函数时 Athena 扫描的数据量



下面的查询扫描 100 MB 的数据。

select * from table where column1 = 'val' and partition_id = '20190309';

但是,以下查询扫描 15 GB 的数据(有超过 90 个分区)

select * from table where column1 = 'val' and partition_id in (select max(partition_id) from table);

如何优化第二个查询以扫描与第一个查询相同的数据量?

这里有两个问题。 上面标量子查询的效率select max(partition_id) from table,以及围绕动态过滤@PiotrFindeisen指出的。

第一个问题是,对 Hive 表的分区键的查询比它们看起来要复杂得多。 大多数人会认为,如果你想要分区键的最大值,你可以简单地对分区键执行查询,但这不起作用,因为 Hive 允许分区为空(并且它还允许不包含行的非空文件)。 具体来说,上面的标量子查询select max(partition_id) from table要求Trino(以前称为PrestoSQL)找到至少包含一行的最大分区。 理想的解决方案是在 Hive 中拥有完美的统计数据,但除此之外,引擎需要为 Hive 提供自定义逻辑,打开分区文件,直到找到非空文件。

如果您确定您的仓库不包含空分区(或者如果您可以接受其含义),则可以将标量子查询替换为隐藏$partitions表上的标量子查询">

select * 
from table 
where column1 = 'val' and 
partition_id = (select max(partition_id) from "table$partitions");

第二个问题是@PiotrFindeisen指出的,它与查询的计划和执行方式有关。 大多数人会查看上面的查询,看到引擎应该在计划期间明显找出select max(partition_id) from "table$partitions"的值,将其内联到计划中,然后继续优化。 不幸的是,这是一个非常复杂的决策,因此引擎只是将其建模为广播联接,其中一部分执行计算出该值,并将该值广播给其余工作线程。 问题是执行的其余部分无法将这些新信息添加到现有处理中,因此它只是扫描所有数据,然后过滤掉您尝试跳过的值。 有一个项目正在进行中,以添加此动态筛选,但尚未完成。

这意味着您今天能做的最好的事情就是运行两个单独的查询:一个用于获取最大partition_id,另一个用于获取内联值。

顺便说一句,在 Presto 0.199 中添加了隐藏的"$partitions"表,我们在 0.201 中修复了一些小错误。 我不确定 Athena 基于哪个版本,但我相信它已经过时了(我写这个答案时的当前版本是 309。

编辑:Presto 在其 0.193 版本中删除了__internal_partitions__表,因此我建议不要在任何生产系统中使用下面Slow aggregation queries for partition keys部分中定义的解决方案,因为 Athena "透明地"更新了 presto 版本。 我最终只是使用了幼稚的SELECT max(partition_date) ...查询,但也使用了Lack of Dynamic Filtering部分中概述的相同回溯技巧。 它比使用__internal_partitions__表慢大约 3 倍,但至少当 Athena 决定更新他们的 presto 版本时它不会中断。

----- 原始帖子-----

因此,我想出了一种相当黑客的方法来实现大型数据集上基于日期的分区,当您只需要查看几个分区的数据即可最大程度地匹配时,但是,请注意,我不是 100% 确定information_schema.__internal_partitions__表的使用有多脆。

如上所述@Dain,实际上有两个问题。 第一个是 max(partition_date) 查询的聚合有多慢,第二个是 Presto 缺乏对动态过滤的支持。

分区键的聚合查询速度慢

为了解决第一个问题,我正在使用information_schema.__internal_partitions__表,它允许我在表的分区上快速聚合,而无需扫描文件中的数据。(请注意,以下查询中的partition_valuepartition_keypartition_number都是__internal_partitions__表的列名,与表的列无关)

如果表只有一个分区键,则可以执行以下操作:

SELECT max(partition_value) FROM information_schema.__internal_partitions__
WHERE table_schema = 'DATABASE_NAME' AND table_name = 'TABLE_NAME'

但是,如果您有多个分区键,则需要更多类似的东西:

SELECT max(partition_date) as latest_partition_date from (
SELECT max(case when partition_key = 'partition_date' then partition_value end) as partition_date, max(case when partition_key = 'another_partition_key' then partition_value end) as another_partition_key
FROM information_schema.__internal_partitions__
WHERE table_schema = 'DATABASE_NAME' AND table_name = 'TABLE_NAME'
GROUP BY partition_number
)
WHERE
-- ... Filter down by values for e.g. another_partition_key
)

这些查询应该运行得相当快(我的运行大约需要 1-2 秒),而无需扫描文件中的实际数据,但同样,我不确定使用这种方法是否有任何陷阱。

缺乏动态过滤

对于我的特定用例,我能够减轻第二个问题的最坏影响,因为我希望在当前日期的有限时间内总会有一个分区(例如,我可以保证任何数据生产或分区加载问题将在 3 天内得到补救)。 事实证明,Athena 在使用 presto 的日期时间函数时确实会做一些预处理,因此动态过滤不会像使用子查询那样出现相同类型的问题。

因此,您可以使用 datetime 函数更改查询以限制它对实际最大值的回溯距离,以便限制扫描的数据量。

SELECT * FROM "DATABASE_NAME"."TABLE_NAME"
WHERE partition_date >= cast(date '2019-06-25' - interval '3' day as varchar) -- Will only scan partitions from 3 days before '2019-06-25'
AND partition_date = (
-- Insert the partition aggregation query from above here
)

我不知道它是否仍然相关,但刚刚发现:

而不是:

select * from table where column1 = 'val' and partition_id in (select max(partition_id) from table);

用:

select a.* from table a 
inner join (select max(partition_id) max_id from table) b on a.partition_id=b.max_id
where column1 = 'val';

我认为这与优化使用分区的连接有关。

相关内容

  • 没有找到相关文章

最新更新