问题陈述
为了确保磁盘大小不会变得不必要,我希望能够删除已从发件箱表中复制的行。
上下文
Postgres处于v12
我们使用Kafka源连接器来流式传输对postgres表所做的更改。这些更改是仅插入,因此在写入kafka后不再需要。源连接器正在使用逻辑复制将更改流式传输到连接器,复制的状态可以显示在pg_replication_slot中。
查看pg_replication_slot时,您可以看到它存储的有用数据,以便了解它必须保留哪些日志,以确保客户端仍然可以进行复制。
例如,当我运行时:
select * from pg_replication_slots;
我可能会看到:
slot_name | plugin | slot_type | datoid | database | temporary | active | active_pid | xmin | catalog_xmin | restart_lsn | confirmed_flush_lsn
-----------+----------+-----------+--------+--------------------+-----------+--------+------------+------+--------------+-------------+---------------------
debezium | wal2json | logical | 26593 | database_name | f | t | 7404 | | 26729 | 0/DCD98E8 | 0/DCD9920
(1 row)
我感兴趣的是,我是否可以可靠地使用该数据,然后使用表上的postgresql元数据来选择从该插槽复制的所有行。
例如,据我所知,这并不起作用,但理想情况下会返回已经复制的行,现在可以安全地从表中修剪:
select * from outbox where age(xmin) < (select age(catalog_xmin) from pg_replication_slots);
任何指导都将是甜蜜的!干杯
我一直在MySQL中使用Debezium实现发件箱模式,并在插入发件箱记录后立即删除它,我在这里看到了这一点https://debezium.io/blog/2019/02/19/reliable-microservices-data-exchange-with-the-outbox-pattern/插入将被拾取并发送,删除将被忽略。因此,从本质上讲,发件箱表中不应该有任何内容(在事务之外(。我还预生成了条目的主键(用于Kafka中的事件ID(,这样我就可以批量插入和删除。
回到这一点,我不得不以不同的方式思考如何将复制进度与发件箱表联系起来。之前在我的问题中,我试图从pg_replication_slot中收集进度,但在这个工作示例中,我改用了pg_stat_replication。我们关心的slot_name可以查询该表,并可以返回滞后结果。例如:
SELECT * FROM outbox WHERE created_at < (SELECT(NOW() - COALESCE(replay_lag, interval '60 seconds')) as stale_time from pg_stat_replication where pg_stat_replication.slot_name = 'outbox_slot');
因此,在这里,这将返回发件箱表中在重播时间或1分钟之外插入的行。