我正试图在我的AWS RDS Aurora Postgres 11.9实例上检测我的三个逻辑复制插槽是否正在备份。我正在使用wal2json插件不断地读取它们。python进程正在读取其中两个插槽。三是kafka连接消费者。
我正在使用下面的查询,但得到了赔率结果。据说我的两个插槽甚至在半夜负载很小的时候也落后了几个GB。我是不是误解了查询的意思?
SELECT redo_lsn, slot_name,restart_lsn,
round((redo_lsn-restart_lsn) / 1024 / 1024 / 1024, 2) AS GB_behind
FROM pg_control_checkpoint(), pg_replication_slots;
我检查过的东西:
- 我已经检查了消费者是否仍在运行
- 我还查看了日志,插入的行的时间戳在插入后0-2秒内从数据库中出来。所以看起来我并没有落后
- 我已经进行了一个端到端的测试,数据在几秒钟内就通过了我的管道,所以它消耗数据的速度肯定相对较快
- 我为python进程使用的两个插槽的
GB_behind
值相同,当前为12.40
。尽管这两个插槽位于不同的逻辑数据库上,这些数据库具有显著不同的负载(其中一个负载高出约1000倍( - 我有一个第三个复制插槽正在被另一个程序读取(kafka-connect(。显示
0
GB_behind
即使在峰值负载下,我的工作负载也不可能在几秒钟内(甚至几分钟内(生成12.4GB的数据。我是不是错过了口译?是否有更好的方法来检查复制插槽落后了多远?
非常感谢!
以下是我的一小段代码(python3.6(,以防有帮助,但我已经使用了一段时间,数据一直在工作:
def consume(msg):
print(msg.payload)
try:
kinesis_client.put_record(StreamName=STREAM_NAME, Data=msg.payload, PartitionKey=partition_key)
except:
logger.exception('PG ETL: Failed to send load to kinesis. Likely too large.')
with con.cursor() as cur:
cur.start_replication(slot_name=replication_slot, options = {'pretty-print' : 1}, decode=True)
cur.consume_stream(consume)
在消费函数期间,我没有正确执行send_feedback
。所以我消耗了记录,但我没有告诉Postgres复制槽我已经消耗了记录。
以下是我的完整消费功能,以防其他人感兴趣:
def consume(msg):
print(msg.payload)
try:
kinesis_client.put_record(StreamName=STREAM_NAME, Data=msg.payload, PartitionKey=partition_key)
except:
logger.exception('PG ETL: Failed to send load to kinesis. Likely too large.')
msg.cursor.send_feedback(flush_lsn=msg.data_start)
with con.cursor() as cur:
cur.start_replication(slot_name=replication_slot, options = {'pretty-print' : 1}, decode=True)
cur.consume_stream(consume)