我想在Postgres数据库中使用一个表作为输入文档的存储(将有数十亿个输入文档)。不断添加文档(使用"UPSERT"逻辑以避免重复),并且很少从表中删除。
将有多个worker应用程序应该从这个表中连续读取数据,从第一个插入的行到最新的行,然后在插入新行时轮询,每一行只读取一次。此外,当worker的处理算法发生变化时,应该从第一行重新读取所有数据。每个应用程序应该能够维护自己的行处理进程,独立于其他应用程序。
我正在寻找一种方法来跟踪最后处理的行,以便能够在任何时候暂停和继续轮询。
我可以想到这些选项:
使用自增字段
然后将最后处理行的自动增量字段值存储在某个地方,以便在下一个查询中使用它,如:
SELECT * FROM document WHERE id > :last_processed_id LIMIT 100;
但是经过一些研究,我发现在并发环境中,的行可能比低。自动增量值将在高于的行之后对客户端可见。值,因此可以跳过某些行。
使用时间戳字段
此选项的问题是时间戳不是唯一的,并且在高插入率期间可能重叠,这再次导致跳过行。此外,调整系统时间(手动或通过NTP)可能会导致不可预测的结果。
为每一行添加一个流程完成标志
这是我能想到的唯一真正可靠的方法,但是它有缺点,包括需要在处理后更新每一行,需要额外的存储来存储每个应用程序的完成标志字段,并且运行一个新的应用程序可能需要更改DB模式。这是我的最后一招,如果有更优雅的方法,我希望避免它。
我知道,任务定义尖叫我应该使用Kafka,但它的问题是它不允许从一个主题中删除单个消息,我需要这个功能。对我来说,在处理过程中应该跳过的Kafka记录的外部列表非常笨拙和低效。此外,Kafka的实时重复数据删除也需要一些外部存储。
我想知道是否有其他的,更有效的方法来解决这个问题使用Postgres数据库。
我最终为每一行保存事务id,然后选择txid值低于具有最小id的事务的记录,如下所示:
SELECT * FROM document
WHERE ((txid = :last_processed_txid AND id > :last_processed_id) OR txid > :last_processed_txid)
AND txid < pg_snapshot_xmin(pg_current_snapshot())
ORDER BY txid, id
LIMIT 100
这样,即使在事务#1之后启动的事务#2完成得比第一个事务快,它所写的行也不会被消费者读取,直到事务#1完成。
Postgres docs声明
xid8值严格单调递增,在数据库集群的生命周期内不能重用
所以它应该适合我的情况
这个解决方案不是很节省空间,因为必须为每一行保存一个额外的8字节的txid字段,并且应该为txid字段创建索引,但是与其他方法相比,这里的主要优点是:
- 数据库模式在添加新消费者的情况下保持不变
- 不需要更新将行标记为已处理,消费者只应该保留最后处理行的id和txid值
- 系统时钟漂移或调整不会导致行被跳过
- 在使用预分配池生成的多个生产者插入带有id的行(例如,生产者1此时插入id为1的行)时,为每一行设置txid有助于按插入顺序查询数据。100,生产者2 - 101..200等)