Postgres WAL滞后与Debezium CDC源连接器,需要重新启动连接器来修复



首先感谢你们的辛勤工作和支持。我正在联系,因为我已经尽了所有的努力来确定我在Confluent Kafka Connect(6.0.x(中托管的Debezium Postgres CDC Source Connector(1.4.1(遇到的问题。我们看到的问题是,连接器在一天中的随机时间似乎会停滞,Postgres WAL滞后将开始攀升,连接器在很大程度上无法恢复,需要重新启动连接器任务。

在一个问题期间,我们在Kafka Connect实例上运行了Java Flight Recorder,并得出了以下堆栈跟踪:

at java.sql.SQLException.setNextException(java.sql.SQLException)
at void java.sql.SQLWarning.setNextWarning(java.sql.SQLWarning) 
at void org.postgresql.core.QueryExecutorBase.addWarning(java.sql.SQLWarning) 
at org.postgresql.core.v3.CopyOperationImpl org.postgresql.core.v3.QueryExecutorImpl.processCopyResults(org.postgresql.core.v3.CopyOperationImpl, boolean) 
at void org.postgresql.core.v3.QueryExecutorImpl.readFromCopy(org.postgresql.core.v3.CopyOperationImpl, boolean) 
at byte[] org.postgresql.core.v3.CopyDualImpl.readFromCopy(boolean) 
at java.nio.ByteBuffer org.postgresql.core.v3.replication.V3PGReplicationStream.receiveNextData(boolean) 
at java.nio.ByteBuffer org.postgresql.core.v3.replication.V3PGReplicationStream.readInternal(boolean) 
at java.nio.ByteBuffer org.postgresql.core.v3.replication.V3PGReplicationStream.readPending() 
at boolean io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.readPending(io.debezium.connector.postgresql.connection.ReplicationStream$ReplicationMessageProcessor) 
at void io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.processMessages(io.debezium.pipeline.source.spi.ChangeEventSource$ChangeEventSourceContext, io.debezium.connector.postgresql.connection.ReplicationStream) 
at void io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(io.debezium.pipeline.source.spi.ChangeEventSource$ChangeEventSourceContext) 
at void io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(io.debezium.pipeline.spi.OffsetContext, io.debezium.pipeline.source.spi.ChangeEventSource$ChangeEventSourceContext) 
at void io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0() 
at void io.debezium.pipeline.ChangeEventSourceCoordinator$$Lambda$860.1129257263.run() 
at java.lang.Object java.util.concurrent.Executors$RunnableAdapter.call() 
at void java.util.concurrent.FutureTask.run() at void java.util.concurrent.ThreadPoolExecutor.runWorker(java.util.concurrent.ThreadPoolExecutor$Worker) 
at void java.util.concurrent.ThreadPoolExecutor$Worker.run() at void java.lang.Thread.run()

通过Github,我可以找到跟踪中描述的addWarning方法调用。它似乎是由于响应中的N字符而发生的。我试着在谷歌上搜索,以更好地了解正在发生的事情或这意味着什么,但最终一无所获。

源代码中调用addWarning的行(我认为(-https://github.com/pgjdbc/pgjdbc/blob/45a771fccd77c1211d8638111ee3e9934849b781/pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java#L1217

这个问题已经存在了一段时间,我们只是在处理它

我们尝试过的几件事:

  • 调整连接器配置、批量大小、林格等
  • 调整堆
  • 调整后的垃圾收集参数
  • 升级到Debezium 1.4.1
  • 在Kubernetes中为pod分配更多CPU

在这个不可恢复的错误期间,没有日志输出到stdout,除了上面的堆栈跟踪之外,我没有任何其他信息,因为我无法远程调试。很想知道是否有其他人遇到过这个问题,或者非常熟悉Debezium Source。

已解决:

我们通过在连接器配置中为slot.stream.params属性提供筛选规则来解决此问题。通过这样做,我们调整了复制槽以过滤掉不必要的表,只发送我们需要的表,在将它们发送到连接器之前,这种过滤是在服务器端完成的。

有关此配置属性的更多信息,您可以在此处查看:https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-属性槽流参数

示例配置:

{
"name": "fulfillment-connector",  
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector", 
"database.hostname": "192.168.99.100", 
"database.port": "5432", 
"database.user": "postgres", 
"database.password": "postgres", 
"database.dbname" : "postgres", 
"topic.prefix": "fulfillment", 
"table.include.list": "public.inventory",
"slot.stream.params": "add-tables=public.inventory" // This enabled the filtering server-side
}
}

相关内容

  • 没有找到相关文章

最新更新