从Postgres表轮询数据的可靠方法



我想在Postgres数据库中使用一个表作为输入文档的存储(将有数十亿个输入文档)。不断添加文档(使用"UPSERT"逻辑以避免重复),并且很少从表中删除。

将有多个worker应用程序应该从这个表中连续读取数据,从第一个插入的行到最新的行,然后在插入新行时轮询,每一行只读取一次。此外,当worker的处理算法发生变化时,应该从第一行重新读取所有数据。每个应用程序应该能够维护自己的行处理进程,独立于其他应用程序。

我正在寻找一种方法来跟踪最后处理的行,以便能够在任何时候暂停和继续轮询。

我可以想到这些选项:

使用自增字段

然后将最后处理行的自动增量字段值存储在某个地方,以便在下一个查询中使用它,如:

SELECT * FROM document WHERE id > :last_processed_id LIMIT 100;

但是经过一些研究,我发现在并发环境中,的行可能比低。自动增量值将在高于的行之后对客户端可见。值,因此可以跳过某些行。

使用时间戳字段

此选项的问题是时间戳不是唯一的,并且在高插入率期间可能重叠,这再次导致跳过行。此外,调整系统时间(手动或通过NTP)可能会导致不可预测的结果。

为每一行添加一个流程完成标志

这是我能想到的唯一真正可靠的方法,但是它有缺点,包括需要在处理后更新每一行,需要额外的存储来存储每个应用程序的完成标志字段,并且运行一个新的应用程序可能需要更改DB模式。这是我的最后一招,如果有更优雅的方法,我希望避免它。

我知道,任务定义尖叫我应该使用Kafka,但它的问题是它不允许从一个主题中删除单个消息,我需要这个功能。对我来说,在处理过程中应该跳过的Kafka记录的外部列表非常笨拙和低效。此外,Kafka的实时重复数据删除也需要一些外部存储。

我想知道是否有其他的,更有效的方法来解决这个问题使用Postgres数据库。

我最终为每一行保存事务id,然后选择txid值低于具有最小id的事务的记录,如下所示:

SELECT * FROM document
WHERE ((txid = :last_processed_txid AND id > :last_processed_id) OR txid > :last_processed_txid) 
AND txid < pg_snapshot_xmin(pg_current_snapshot())
ORDER BY txid, id
LIMIT 100

这样,即使在事务#1之后启动的事务#2完成得比第一个事务快,它所写的行也不会被消费者读取,直到事务#1完成。

Postgres docs声明

xid8值严格单调递增,在数据库集群的生命周期内不能重用

所以它应该适合我的情况

这个解决方案不是很节省空间,因为必须为每一行保存一个额外的8字节的txid字段,并且应该为txid字段创建索引,但是与其他方法相比,这里的主要优点是:

  • 数据库模式在添加新消费者的情况下保持不变
  • 不需要更新将行标记为已处理,消费者只应该保留最后处理行的id和txid值
  • 系统时钟漂移或调整不会导致行被跳过
  • 在使用预分配池生成的多个生产者插入带有id的行(例如,生产者1此时插入id为1的行)时,为每一行设置txid有助于按插入顺序查询数据。100,生产者2 - 101..200等)