Postgresql Prune从发件箱表中复制了数据



问题陈述

为了确保磁盘大小不会变得不必要,我希望能够删除已从发件箱表中复制的行。

上下文

Postgres处于v12

我们使用Kafka源连接器来流式传输对postgres表所做的更改。这些更改是仅插入,因此在写入kafka后不再需要。源连接器正在使用逻辑复制将更改流式传输到连接器,复制的状态可以显示在pg_replication_slot中。

查看pg_replication_slot时,您可以看到它存储的有用数据,以便了解它必须保留哪些日志,以确保客户端仍然可以进行复制。

例如,当我运行时:

select * from pg_replication_slots;

我可能会看到:

slot_name |  plugin  | slot_type | datoid |      database      | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn
-----------+----------+-----------+--------+--------------------+-----------+--------+------------+------+--------------+-------------+---------------------
debezium  | wal2json | logical   |  26593 | database_name | f         | t      |       7404 |      |        26729 | 0/DCD98E8   | 0/DCD9920
(1 row)

我感兴趣的是,我是否可以可靠地使用该数据,然后使用表上的postgresql元数据来选择从该插槽复制的所有行。

例如,据我所知,这并不起作用,但理想情况下会返回已经复制的行,现在可以安全地从表中修剪:

select * from outbox where age(xmin) < (select age(catalog_xmin) from pg_replication_slots);

任何指导都将是甜蜜的!干杯

我一直在MySQL中使用Debezium实现发件箱模式,并在插入发件箱记录后立即删除它,我在这里看到了这一点https://debezium.io/blog/2019/02/19/reliable-microservices-data-exchange-with-the-outbox-pattern/插入将被拾取并发送,删除将被忽略。因此,从本质上讲,发件箱表中不应该有任何内容(在事务之外(。我还预生成了条目的主键(用于Kafka中的事件ID(,这样我就可以批量插入和删除。

回到这一点,我不得不以不同的方式思考如何将复制进度与发件箱表联系起来。之前在我的问题中,我试图从pg_replication_slot中收集进度,但在这个工作示例中,我改用了pg_stat_replication。我们关心的slot_name可以查询该表,并可以返回滞后结果。例如:

SELECT * FROM outbox WHERE created_at < (SELECT(NOW() - COALESCE(replay_lag, interval '60 seconds')) as stale_time from pg_stat_replication where pg_stat_replication.slot_name = 'outbox_slot');

因此,在这里,这将返回发件箱表中在重播时间或1分钟之外插入的行。

相关内容

  • 没有找到相关文章

最新更新