首先感谢你们的辛勤工作和支持。我正在联系,因为我已经尽了所有的努力来确定我在Confluent Kafka Connect(6.0.x(中托管的Debezium Postgres CDC Source Connector(1.4.1(遇到的问题。我们看到的问题是,连接器在一天中的随机时间似乎会停滞,Postgres WAL滞后将开始攀升,连接器在很大程度上无法恢复,需要重新启动连接器任务。
在一个问题期间,我们在Kafka Connect实例上运行了Java Flight Recorder,并得出了以下堆栈跟踪:
at java.sql.SQLException.setNextException(java.sql.SQLException)
at void java.sql.SQLWarning.setNextWarning(java.sql.SQLWarning)
at void org.postgresql.core.QueryExecutorBase.addWarning(java.sql.SQLWarning)
at org.postgresql.core.v3.CopyOperationImpl org.postgresql.core.v3.QueryExecutorImpl.processCopyResults(org.postgresql.core.v3.CopyOperationImpl, boolean)
at void org.postgresql.core.v3.QueryExecutorImpl.readFromCopy(org.postgresql.core.v3.CopyOperationImpl, boolean)
at byte[] org.postgresql.core.v3.CopyDualImpl.readFromCopy(boolean)
at java.nio.ByteBuffer org.postgresql.core.v3.replication.V3PGReplicationStream.receiveNextData(boolean)
at java.nio.ByteBuffer org.postgresql.core.v3.replication.V3PGReplicationStream.readInternal(boolean)
at java.nio.ByteBuffer org.postgresql.core.v3.replication.V3PGReplicationStream.readPending()
at boolean io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.readPending(io.debezium.connector.postgresql.connection.ReplicationStream$ReplicationMessageProcessor)
at void io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.processMessages(io.debezium.pipeline.source.spi.ChangeEventSource$ChangeEventSourceContext, io.debezium.connector.postgresql.connection.ReplicationStream)
at void io.debezium.connector.postgresql.PostgresStreamingChangeEventSource.execute(io.debezium.pipeline.source.spi.ChangeEventSource$ChangeEventSourceContext)
at void io.debezium.pipeline.ChangeEventSourceCoordinator.streamEvents(io.debezium.pipeline.spi.OffsetContext, io.debezium.pipeline.source.spi.ChangeEventSource$ChangeEventSourceContext)
at void io.debezium.pipeline.ChangeEventSourceCoordinator.lambda$start$0()
at void io.debezium.pipeline.ChangeEventSourceCoordinator$$Lambda$860.1129257263.run()
at java.lang.Object java.util.concurrent.Executors$RunnableAdapter.call()
at void java.util.concurrent.FutureTask.run() at void java.util.concurrent.ThreadPoolExecutor.runWorker(java.util.concurrent.ThreadPoolExecutor$Worker)
at void java.util.concurrent.ThreadPoolExecutor$Worker.run() at void java.lang.Thread.run()
通过Github,我可以找到跟踪中描述的addWarning
方法调用。它似乎是由于响应中的N
字符而发生的。我试着在谷歌上搜索,以更好地了解正在发生的事情或这意味着什么,但最终一无所获。
源代码中调用addWarning的行(我认为(-https://github.com/pgjdbc/pgjdbc/blob/45a771fccd77c1211d8638111ee3e9934849b781/pgjdbc/src/main/java/org/postgresql/core/v3/QueryExecutorImpl.java#L1217
这个问题已经存在了一段时间,我们只是在处理它
我们尝试过的几件事:
- 调整连接器配置、批量大小、林格等
- 调整堆
- 调整后的垃圾收集参数
- 升级到Debezium 1.4.1
- 在Kubernetes中为pod分配更多CPU
在这个不可恢复的错误期间,没有日志输出到stdout,除了上面的堆栈跟踪之外,我没有任何其他信息,因为我无法远程调试。很想知道是否有其他人遇到过这个问题,或者非常熟悉Debezium Source。
已解决:
我们通过在连接器配置中为slot.stream.params
属性提供筛选规则来解决此问题。通过这样做,我们调整了复制槽以过滤掉不必要的表,只发送我们需要的表,在将它们发送到连接器之前,这种过滤是在服务器端完成的。
有关此配置属性的更多信息,您可以在此处查看:https://debezium.io/documentation/reference/stable/connectors/postgresql.html#postgresql-属性槽流参数
示例配置:
{
"name": "fulfillment-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "192.168.99.100",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "postgres",
"topic.prefix": "fulfillment",
"table.include.list": "public.inventory",
"slot.stream.params": "add-tables=public.inventory" // This enabled the filtering server-side
}
}