如何使用Debezium从Postgrs变化



流使用debezium

从Postgres变化

已完成的设置:

  1. docker设置。
  2. 启动Postgres,Zookeeper,Kafka和Debezium Connector。
  3. 带有解码器的远程数据库设置,wal2json(Postgres(。
  4. 用卷发连接到Debezium。
  5. 创建一个观察者。

问题:当我启动Watcher时,它正在阅读所有发生的更改,但是当完成任何插入时,Kafka正在为Debezium抛出一个例外,以说" An exception occurred in the change event producer. This connector will be stopped.",而在Watcher中则没有显示。

我对这些概念非常陌生,无法弄清楚我在环境设置中错过了什么,这是我在堆栈溢出中的第一个问题,请忽略我的错误。

主要问题是我本地DB工作正常。
有人可以帮助吗?
预先感谢

019-05-02 14:09:47,242 WARN   Postgres|kafkaserver|records-stream-producer  Closing replication stream due to db connection IO exception...   [io.debezium.connector.postgresql.RecordsStreamProducer]
2019-05-02 14:09:47,365 INFO   ||  WorkerSourceTask{id=kafka-public-connector-0} Committing offsets   [org.apache.kafka.connect.runtime.WorkerSourceTask]
2019-05-02 14:09:47,366 INFO   ||  WorkerSourceTask{id=kafka-public-connector-0} flushing 0 outstanding messages for offset commit   [org.apache.kafka.connect.runtime.WorkerSourceTask]
2019-05-02 14:09:47,375 ERROR  ||  WorkerSourceTask{id=kafka-public-connector-0} Task threw an uncaught and unrecoverable exception   [org.apache.kafka.connect.runtime.WorkerTask]
org.apache.kafka.connect.errors.ConnectException: An exception ocurred in the change event producer. This connector will be stopped.
    at io.debezium.connector.base.ChangeEventQueue.throwProducerFailureIfPresent(ChangeEventQueue.java:170)
    at io.debezium.connector.base.ChangeEventQueue.poll(ChangeEventQueue.java:151)
    at io.debezium.connector.postgresql.PostgresConnectorTask.poll(PostgresConnectorTask.java:161)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:245)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:221)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.postgresql.util.PSQLException: Database connection failed when reading from copy
    at org.postgresql.core.v3.QueryExecutorImpl.readFromCopy(QueryExecutorImpl.java:1037)
    at org.postgresql.core.v3.CopyDualImpl.readFromCopy(CopyDualImpl.java:41)
    at org.postgresql.core.v3.replication.V3PGReplicationStream.receiveNextData(V3PGReplicationStream.java:155)
    at org.postgresql.core.v3.replication.V3PGReplicationStream.readInternal(V3PGReplicationStream.java:124)
    at org.postgresql.core.v3.replication.V3PGReplicationStream.read(V3PGReplicationStream.java:70)
    at io.debezium.connector.postgresql.connection.PostgresReplicationConnection$1.read(PostgresReplicationConnection.java:251)
    at io.debezium.connector.postgresql.RecordsStreamProducer.streamChanges(RecordsStreamProducer.java:134)
    at io.debezium.connector.postgresql.RecordsStreamProducer.lambda$start$0(RecordsStreamProducer.java:120)
    ... 5 more
Caused by: java.io.EOFException
    at org.postgresql.core.PGStream.receiveChar(PGStream.java:308)
    at org.postgresql.core.v3.QueryExecutorImpl.processCopyResults(QueryExecutorImpl.java:1079)
    at org.postgresql.core.v3.QueryExecutorImpl.readFromCopy(QueryExecutorImpl.java:1035)
    ... 12 more
2019-05-02 14:09:47,387 ERROR  ||  WorkerSourceTask{id=kafka-public-connector-0} Task is being killed and will not recover until manually restarted   [org.apache.kafka.connect.runtime.WorkerTask]
  • 是否有解决此问题的完整指南?
  • 主要目标是,我有一个数据库,其中有大量数据应用程序(生产者(从另一台服务器中获取数据并保留我们自己的数据库和另一个应用程序(消费者(中的所有数据都是商店并应用业务逻辑和前端。我在这里想用另一个应用程序(消费者(将命中替换为DB这个Debezium和Kafka部分。
  • 或有任何方法。

感谢所有。上述问题已解决。远程数据库中设置的实际问题。很少有其他依赖项无法正确安装,例如PostGIS,Protobuf-C,Decoderbufs已正确安装问题。

对我来说,我检查了RDS db实例的replication_slot大小,但它不活动。我使用以下查询获取replication_slot的状态

select slot_name, pg_size_pretty(pg_xlog_location_diff(pg_current_xlog_location(),restart_lsn)) as replicationSlotLag, active from pg_replication_slots ;

如果Actice列的值" F",则表示它是无活动的,并且连接器无法连接到DB。由于复制插槽已经不活跃,因此我使用以下语句

将其丢弃了
select pg_drop_replication_slot('your_slot_name');

重新启动连接器后,解决了此错误。

相关内容

  • 没有找到相关文章

最新更新