优化海量表聚合查询,刷新物化视图



假设我有以下PostgreSQL数据库模式:

Group
id: int
Task:
id: int
created_at: datetime
group: FK Group

我有以下具体化视图来计算任务数和每组的最新Task.created_at值:

CREATE MATERIALIZED VIEW group_statistics AS (
SELECT
group.id as group_id,
MAX(task.created_at) AS latest_task_created_at,
COUNT(task.id) AS task_count
FROM group
LEFT OUTER JOIN task ON (group.id = task.group_id)
GROUP BY group.id
);

Task表当前有 2000 万条记录,因此刷新此具体化视图需要很长时间(20-30 秒)。自从我们开始每 10 分钟刷新一次具体化以来,我们也遇到了一些短暂但主要的数据库性能问题,即使使用 CONWHILE:

REFRESH MATERIALIZED VIEW CONCURRENTLY group_statistics;

有没有更高性能的方法来计算这些值?请注意,它们不需要精确。近似值完全没问题,例如latest_task_created_at可能会延迟 10-20 分钟。

我正在考虑在每次写入Task表时缓存这些值。无论是在 Redis 中还是在 PostgreSQL 本身中。

更新

人们正在请求执行计划。EXPLAIN不适用于REFRESH但我在实际查询上运行了EXPLAIN。请注意,它与我上面的理论数据模型不同。在这种情况下,DatabaseGroupRecordTask。另请注意,我在PostgreSQL 12.10上。

EXPLAIN (analyze, buffers, verbose)
SELECT
store_database.id as database_id,
MAX(store_record.updated_at) AS latest_record_updated_at,
COUNT(store_record.id) AS record_count
FROM store_database
LEFT JOIN store_record ON (store_database.id = store_record.database_id)
GROUP BY store_database.id;

输出:

HashAggregate  (cost=1903868.71..1903869.22 rows=169 width=32) (actual time=18227.016..18227.042 rows=169 loops=1)
"  Output: store_database.id, max(store_record.updated_at), count(store_record.id)"
Group Key: store_database.id
Buffers: shared hit=609211 read=1190704
I/O Timings: read=3385.027
->  Hash Right Join  (cost=41.28..1872948.10 rows=20613744 width=40) (actual time=169.766..14572.558 rows=20928339 loops=1)
"        Output: store_database.id, store_record.updated_at, store_record.id"
Inner Unique: true
Hash Cond: (store_record.database_id = store_database.id)
Buffers: shared hit=609211 read=1190704
I/O Timings: read=3385.027
->  Seq Scan on public.store_record  (cost=0.00..1861691.23 rows=20613744 width=40) (actual time=0.007..8607.425 rows=20928316 loops=1)
"              Output: store_record.id, store_record.key, store_record.data, store_record.created_at, store_record.updated_at, store_record.database_id, store_record.organization_id, store_record.user_id"
Buffers: shared hit=609146 read=1190704
I/O Timings: read=3385.027
->  Hash  (cost=40.69..40.69 rows=169 width=16) (actual time=169.748..169.748 rows=169 loops=1)
Output: store_database.id
Buckets: 1024  Batches: 1  Memory Usage: 16kB
Buffers: shared hit=65
->  Index Only Scan using store_database_pkey on public.store_database  (cost=0.05..40.69 rows=169 width=16) (actual time=0.012..0.124 rows=169 loops=1)
Output: store_database.id
Heap Fetches: 78
Buffers: shared hit=65
Planning Time: 0.418 ms
JIT:
Functions: 14
"  Options: Inlining true, Optimization true, Expressions true, Deforming true"
"  Timing: Generation 2.465 ms, Inlining 15.728 ms, Optimization 92.852 ms, Emission 60.694 ms, Total 171.738 ms"
Execution Time: 18229.600 ms

请注意,执行时间较长。有时需要 5-10 分钟才能运行。我很想将其减少到最多几秒钟。

更新 #2

当查询需要几分钟时,人们正在请求执行计划。在这里:

HashAggregate  (cost=1905790.10..1905790.61 rows=169 width=32) (actual time=128442.799..128442.825 rows=169 loops=1)
"  Output: store_database.id, max(store_record.updated_at), count(store_record.id)"
Group Key: store_database.id
Buffers: shared hit=114011 read=1685876 dirtied=367
I/O Timings: read=112953.619
->  Hash Right Join  (cost=15.32..1874290.39 rows=20999810 width=40) (actual time=323.497..124809.521 rows=21448762 loops=1)
"        Output: store_database.id, store_record.updated_at, store_record.id"
Inner Unique: true
Hash Cond: (store_record.database_id = store_database.id)
Buffers: shared hit=114011 read=1685876 dirtied=367
I/O Timings: read=112953.619
->  Seq Scan on public.store_record  (cost=0.00..1862849.43 rows=20999810 width=40) (actual time=0.649..119522.406 rows=21448739 loops=1)
"              Output: store_record.id, store_record.key, store_record.data, store_record.created_at, store_record.updated_at, store_record.database_id, store_record.organization_id, store_record.user_id"
Buffers: shared hit=113974 read=1685876 dirtied=367
I/O Timings: read=112953.619
->  Hash  (cost=14.73..14.73 rows=169 width=16) (actual time=322.823..322.824 rows=169 loops=1)
Output: store_database.id
Buckets: 1024  Batches: 1  Memory Usage: 16kB
Buffers: shared hit=37
->  Index Only Scan using store_database_pkey on public.store_database  (cost=0.05..14.73 rows=169 width=16) (actual time=0.032..0.220 rows=169 loops=1)
Output: store_database.id
Heap Fetches: 41
Buffers: shared hit=37
Planning Time: 5.390 ms
JIT:
Functions: 14
"  Options: Inlining true, Optimization true, Expressions true, Deforming true"
"  Timing: Generation 1.306 ms, Inlining 82.966 ms, Optimization 176.787 ms, Emission 62.561 ms, Total 323.620 ms"
Execution Time: 128474.490 ms

您的 MV 当前有 169 行,因此写入成本可以忽略不计(除非您有锁定问题)。这一切都与大桌子上昂贵的顺序扫描有关。

完整计数很慢

获取每个组("数据库")的精确计数是昂贵的。在Postgres中没有灵丹妙药。Postgres必须计算所有行。如果表是全可见的(可见性地图是最新的),Postgres 可以通过仅遍历覆盖索引来稍微缩短该过程。(您没有提供索引...

有一些可能的快捷方式可以估计整个表中的总行数。但是,每个组都不容易获得相同的内容。看:

  • 在PostgreSQL中发现表的行数的快速方法

但没那么

也就是说,您的查询仍然可以大大加快。联接前聚合:

SELECT id AS database_id
, r.latest_record_updated_at
, COALESCE(r.record_count, 0) AS record_count
FROM   store_database d
LEFT   JOIN (
SELECT r.database_id AS id
, max(r.updated_at) AS latest_record_updated_at
, count(*) AS record_count
FROM   store_record r
GROUP  BY 1
) r USING (id);

看:

  • 使用 LEFT JOIN 的查询不返回计数为 0 的行

并使用稍快(在这种情况下是等效的)count(*).相关:

  • PostgreSQL:按分钟运行查询的行数

此外 - 提供可见性 -count(*)可以使用任何非部分索引,最好是最小的索引,而count(store_record.id)仅限于该列上的索引(并且还必须检查值)。

I/O 是您的瓶颈

您为昂贵的执行添加了EXPLAIN计划,并且飞涨的 I/O 成本脱颖而出。它主导了查询的成本.
快速计划:

Buffers: shared hit=609146 read=1190704
I/O Timings: read=3385.027

慢速计划:

Buffers: shared hit=113974 read=1685876 dirtied=367
I/O Timings: read=112953.619

您的Seq Scan on public.store_record在读取数据文件块上花费了 112953.619 毫秒。 367 个脏缓冲区表示不到 3MB,仅占总 I/O 的一小部分。无论哪种方式,I/O 都主导着成本。
要么是存储系统非常慢,要么更有可能是因为快速查询的 I/O 成本降低了 30 倍,因此并发工作负载(在配置不当的系统上)对 I/O 的争用过多。以下一项或多项可以帮助:

  • 更快的存储
  • 更好(更合适)的服务器配置
  • 更多
  • RAM(以及允许更多缓存的服务器配置)
  • 更少的并发工作负载
  • 更高效的表设计,磁盘占用空间更小
  • 更智能的查询,需要读取更少的数据块
  • 升级到当前版本的 Postgres

无计数的快得多

如果没有计数,只需latest_record_updated_at,此查询将立即提供:

SELECT d.id
, (SELECT r.updated_at
FROM   store_record r
WHERE  r.database_id = d.id
ORDER  BY r.updated_at DESC NULLS LAST
LIMIT  1) AS latest_record_updated_at
FROM   store_database d;

结合匹配索引!理想:

CREATE INDEX store_record_database_id_idx ON store_record (database_id, updated_at DESC NULL LAST);

看:

  • 优化分组依据查询以检索每个用户的最新行

相同的索引也可以帮助完成上面的完整查询,即使没有那么显着。如果表的清空足够多(可见性地图是最新的),Postgres 可以对较小的索引执行顺序扫描,而无需涉及较大的表。显然,对于更宽的表行更重要 - 尤其是缓解您的 I/O 问题。 (当然,索引维护也会增加成本...

升级以使用并行性

如果可能的话,升级到最新版本的 Postgres。Postgres 14 或 15 与 Postgres 12 相比,获得了各种性能改进。最重要的是,引用Postgres 14的发行说明:

允许REFRESH MATERIALIZED VIEW使用并行性(巴拉特·鲁皮雷迪)

对于您的用例来说可能是巨大的。相关:

  • 并行实例化视图刷新

估计?

警告:实验性的东西。

你说:

近似值完全没问题

我在查询计划中只看到 169 个组("数据库")。Postgres 维护列统计信息。虽然组的不同计数很小,并且低于列store_record.database_id的"统计目标"(您必须确保!),但我们可以使用它。看:

  • 如何查看分析使用的统计目标?

除非您有非常激进的autovacuum设置,否则为了获得更好的估计值,请在运行下面的查询之前database_id上运行ANALYZE以更新列统计信息。(还更新了pg_class中的reltuplesrelpages):

ANALYZE public.store_record(database_id);

甚至(还要更新上述查询的可见性地图):

VACUUM ANALYZE public.store_record(database_id);

这是最昂贵的部分(有附带好处)。它是可选的。

WITH ct(total_est) AS ( 
SELECT reltuples / relpages * (pg_relation_size(oid) / 8192)
FROM   pg_class
WHERE  oid = 'public.store_record'::regclass  -- your table here
)
SELECT v.database_id, (ct.total_est * v.freq)::bigint AS estimate
FROM   pg_stats s
,  ct
,  unnest(most_common_vals::text::int[], most_common_freqs) v(database_id, freq)
WHERE  s.schemaname = 'public'
AND    s.tablename = 'store_record'
AND    s.attname = 'database_id';

该查询依赖于各种 Postgres 内部,并且可能会在未来的主要版本中中断(尽管不太可能)。使用 Postgres 14 进行测试,但也适用于 Postgres 12。这基本上是黑魔法。你需要知道你在做什么。您已被警告.
但查询几乎为零。

从上面的快速查询中获取latest_record_updated_at的精确值,并联接到计数的这些估计值。

基本说明:Postgres 在系统目录pg_statistic中维护列统计信息。pg_stats是视图,更易于访问。除此之外,还收集了"最常见的值"及其相对频率。以most_common_valsmost_common_freqs表示。乘以当前(估计的)总数,我们得到每组的估计值。您可以手动完成所有这些操作,但Postgres可能在这方面更快更好。

有关总估计值ct.total_est计算,请参阅:

  • 在PostgreSQL中发现表的行数的快速方法

(请注意此查询的"安全和显式"表单。

给定解释计划,顺序扫描似乎导致了缓慢。索引绝对可以提供帮助。

您还可以使用仅索引扫描,因为查询中的列很少。因此,您可以将这样的东西用于store_record表。

Create index idx_store_record_db_id btree(database_id) include (id, updated_at); 

还需要store_database表上的 id 列上的索引。

Create index idx_db_id on store_database btree(id)

有时在这种情况下,有必要考虑完全不同的业务逻辑解决方案。 例如,计数操作是一个非常慢的查询。这在数据库中无法通过任何方式加速。在这种情况下可以做些什么?由于我不了解您的业务逻辑的全部细节,因此我将告诉您几个选项。但是,这些选项也有缺点。例如:

group_id    id
---------------
1           12
1           145
1           100
3           652
3           102

我们将其分组一次,然后将数字插入表格中。

group_id    count_id
--------------------
1           3
3           2

之后,当每条记录插入主表时,我们使用触发器更新组表。像这样:

update group_table set count_id = count_id + 1 where group_id = new.group_id

或者像这样:

update group_table set count_id = (select count(id) from main_table where group_id = new.group_id)

我在这里不是在谈论小细节。为了正确更新行,我们可以使用子句for update,因此for update锁定其他事务的行。

因此,主要的解决方案是:像count这样的函数需要对分组数据单独执行,而不是一次在整个表上执行。可以应用类似的解决方案。我解释了一下,以便一般理解。 此解决方案的缺点是:如果在此主表上有许多插入操作,则插入性能将降低。

并行计划

如果您首先收集store_record统计信息,然后将其与store_database联接,您将获得更好的、可并行的计划。

EXPLAIN (analyze, buffers, verbose)
SELECT
store_database.id          as database_id,
s.latest_record_updated_at as latest_record_updated_at,
coalesce(s.record_count,0) as record_count
FROM store_database
LEFT JOIN 
(   SELECT
store_record.database_id     as database_id,
MAX(store_record.updated_at) as latest_record_updated_at,
COUNT(store_record.id)       as record_count
FROM store_record
GROUP BY store_record.database_id)
AS s ON (store_database.id = s.database_id);

这是一个演示 - 最后你可以看到两个查询返回完全相同的结果,但我建议的那个运行速度更快,计划更灵活。派遣的工人数量取决于您的max_worker_processesmax_parallel_workersmax_parallel_workers_per_gather设置以及计划器内部的一些附加逻辑。

store_record中的行越多,差异将更加明显。在我的测试行数为 4000 万的系统上,一名工作人员从 14 秒下降到 3 秒,当它限制派遣 16 名可用工人中的 6 名时,它为 1.4 秒。

<小时 />

缓存

我正在考虑在每次写入任务表时缓存这些值。无论是在 Redis 中还是在 PostgreSQL 本身中。

如果这是一个选项,则值得一试 - 您可以保持适当的准确性和即时可用的统计信息,但代价是某些(可递延的)表吞吐量开销。您可以将materialized view替换为常规表格,也可以将统计信息列添加到store_database

create table store_record_statistics(
database_id smallint unique references store_database(id) 
on update cascade,
latest_record_updated_at timestamptz,
record_count integer default 0);
insert into store_record_statistics --initializes table with view definition
SELECT g.id, MAX(s.updated_at), COUNT(*)
FROM store_database g LEFT JOIN store_record s ON g.id = s.database_id
GROUP BY g.id;
create index store_record_statistics_idx 
on store_record_statistics (database_id) 
include (latest_record_updated_at,record_count);
cluster verbose store_record_statistics using store_record_statistics_idx;

并将表保持最新状态保留为每次更改时触发store_recordtrigger

CREATE FUNCTION maintain_store_record_statistics_trigger() 
RETURNS TRIGGER LANGUAGE plpgsql AS
$$ BEGIN
IF TG_OP IN ('UPDATE', 'DELETE') THEN --decrement and find second most recent updated_at
UPDATE store_record_statistics srs
SET (record_count,
latest_record_updated_at) 
=   (record_count - 1,
(SELECT s.updated_at
FROM   store_record s
WHERE  s.database_id = srs.database_id
ORDER  BY s.updated_at DESC NULLS LAST
LIMIT  1))
WHERE database_id = old.database_id;
END IF;
IF TG_OP in ('INSERT','UPDATE') THEN --increment and pick most recent updated_at
UPDATE store_record_statistics 
SET (record_count,
latest_record_updated_at) 
=   (record_count + 1,
greatest(
latest_record_updated_at,
new.updated_at))
WHERE database_id=new.database_id;
END IF;
RETURN NULL;
END $$;

创建触发器deferrable将其执行时间与主操作分离,但它仍将在事务结束时推断其成本。

CREATE CONSTRAINT TRIGGER maintain_store_record_statistics
AFTER INSERT OR UPDATE OF database_id OR DELETE ON store_record 
INITIALLY DEFERRED FOR EACH ROW
EXECUTE PROCEDURE maintain_store_record_statistics_trigger();

TRUNCATE触发器不能与其他事件FOR EACH ROW声明,因此必须单独定义

CREATE FUNCTION maintain_store_record_statistics_truncate_trigger() 
RETURNS TRIGGER LANGUAGE plpgsql AS
$$ BEGIN
update store_record_statistics 
set (record_count, latest_record_updated_at)
= (0           , null);--wipes/resets all stats
RETURN NULL;
END $$;
CREATE TRIGGER maintain_store_record_statistics_truncate
AFTER TRUNCATE ON store_record
EXECUTE PROCEDURE maintain_store_record_statistics_truncate_trigger();

在我的测试中,更新或删除 1 亿行表中的 10000 个随机行在几秒钟内运行。单个插入 1000 个随机生成的新行需要 25 毫秒(不使用触发器)和 200 毫秒。一百万是30多岁零3分钟。

演示。


分区支持的并行计划

store_record可能非常适合分区:

create table store_record(
id serial not null,
updated_at timestamptz default now(),
database_id smallint references store_database(id)
) partition by range (database_id);
DO $$
declare 
vi_database_max_id           smallint:=0;
vi_database_id               smallint:=0;
vi_database_id_per_partition smallint:=40;--tweak for lower/higher granularity
begin
select max(id) from store_database into vi_database_max_id;
for vi_database_id in 1 .. vi_database_max_id by vi_database_id_per_partition loop
execute format ('
drop table if exists store_record_p_%1$s;
create table store_record_p_%1$s
partition of store_record
for VALUES from (%1$s) to (%1$s + %2$s)
with (parallel_workers=16);
', vi_database_id, vi_database_id_per_partition);
end loop;
end $$ ;

以这种方式拆分对象可以让计划人员相应地拆分其扫描,这最适合并行工作线程,但不需要它们。即使是视图后面的初始未更改查询也能够利用此结构:

HashAggregate  (cost=60014.27..60041.47 rows=2720 width=18) (actual time=910.657..910.698 rows=169 loops=1)
  Output: store_database.id, max(store_record_p_1.updated_at), count(store_record_p_1.id)
  Group Key: store_database.id
  Buffers: shared hit=827 read=9367 dirtied=5099 written=4145
  ->  Hash Right Join  (cost=71.20..45168.91 rows=1979382 width=14) (actual time=0.064..663.603 rows=1600020 loops=1)
        Output: store_database.id, store_record_p_1.updated_at, store_record_p_1.id
        Inner Unique: true
        Hash Cond: (store_record_p_1.database_id = store_database.id)
        Buffers: shared hit=827 read=9367 dirtied=5099 written=4145
        ->  Append  (cost=0.00..39893.73 rows=1979382 width=14) (actual time=0.014..390.152 rows=1600000 loops=1)
              Buffers: shared hit=826 read=9367 dirtied=5099 written=4145
              ->  Seq Scan on public.store_record_p_1  (cost=0.00..8035.02 rows=530202 width=14) (actual time=0.014..77.130 rows=429068 loops=1)
                    Output: store_record_p_1.updated_at, store_record_p_1.id, store_record_p_1.database_id
                    Buffers: shared read=2733 dirtied=1367 written=1335
              ->  Seq Scan on public.store_record_p_41  (cost=0.00..8067.36 rows=532336 width=14) (actual time=0.017..75.193 rows=430684 loops=1)
                    Output: store_record_p_41.updated_at, store_record_p_41.id, store_record_p_41.database_id
                    Buffers: shared read=2744 dirtied=1373 written=1341
              ->  Seq Scan on public.store_record_p_81  (cost=0.00..8029.14 rows=529814 width=14) (actual time=0.017..74.583 rows=428682 loops=1)
                    Output: store_record_p_81.updated_at, store_record_p_81.id, store_record_p_81.database_id
                    Buffers: shared read=2731 dirtied=1366 written=1334
              ->  Seq Scan on public.store_record_p_121  (cost=0.00..5835.90 rows=385090 width=14) (actual time=0.016..45.407 rows=311566 loops=1)
                    Output: store_record_p_121.updated_at, store_record_p_121.id, store_record_p_121.database_id
                    Buffers: shared hit=826 read=1159 dirtied=993 written=135
              ->  Seq Scan on public.store_record_p_161  (cost=0.00..29.40 rows=1940 width=14) (actual time=0.008..0.008 rows=0 loops=1)
                    Output: store_record_p_161.updated_at, store_record_p_161.id, store_record_p_161.database_id
        ->  Hash  (cost=37.20..37.20 rows=2720 width=2) (actual time=0.041..0.042 rows=169 loops=1)
              Output: store_database.id
              Buckets: 4096  Batches: 1  Memory Usage: 38kB
              Buffers: shared hit=1
              ->  Seq Scan on public.store_database  (cost=0.00..37.20 rows=2720 width=2) (actual time=0.012..0.021 rows=169 loops=1)
                    Output: store_database.id
                    Buffers: shared hit=1
Planning Time: 0.292 ms
Execution Time: 910.811 ms

演示。最好测试哪个粒度可在设置上提供最佳性能。您甚至可以测试子分割,为每个store_record.database_id提供一个分区,然后将其子分区为日期范围,从而简化对最新条目的访问。

物化视图不是一个好主意......

如果您只想"计算任务数量和每组的最新Task.created_at值">,那么我建议您简单地:

group表中添加两列:

ALTER TABLE IF EXISTS "group" ADD COLUMN IF NOT EXISTS task_count integer SET DEFAULT 0 ;
ALTER TABLE IF EXISTS "group" ADD COLUMN IF NOT EXISTS last_created_date timestamp ; -- instead of datetime which does not really exist in postgres ...

从表task上定义的触发器更新这 2 列:

CREATE OR REPLACE FUNCTION task_insert() RETURNS trigger LANGUAGE plpgsql AS $$
BEGIN
UPDATE "group" AS g
SET task_count = count + 1
, last_created_at = NEW.created_at -- assuming that the last task inserted has the latest created_at datetime of the group, if not, then reuse the solution proposed in task_delete()
WHERE g.id = NEW.group ;
RETURN NEW ;
END ; $$ ;
CREATE OR REPLACE TRIGGER task_insert AFTER INSERT ON task
FOR EACH ROW EXECUTE FUNCTION task_insert () ;
CREATE OR REPLACE FUNCTION task_delete () RETURNS trigger LANGUAGE plpgsql AS $$
BEGIN
UPDATE "group" AS g
SET task_count = count - 1
, last_created_at = u.last_created_at
FROM 
( SELECT max(created_at) AS last_created_at
FROM task
WHERE t.group = OLD.group
) AS u
WHERE g.id = OLD.group ;
RETURN OLD ;
END ; $$ ;
CREATE OR REPLACE TRIGGER task_insert AFTER DELETE ON task
FOR EACH ROW EXECUTE FUNCTION task_delete () ;

您将需要在开始时执行设置操作...

UPDATE "group" AS g
SET task_count = ref.count
, last_created_date = ref.last_created_at
FROM
( SELECT group
, max(created_at) AS last_created_at
, count(*) AS count
FROM task
GROUP BY group
) AS ref
WHERE g.id= ref.group ;

。但是,您将不再遇到查询的性能问题!!

SELECT * FROM "group"

您将优化数据库的大小...

最新更新