Debezium-Oracle连接器-服务未启动



DebeziumEngine正在查找kafka主题事件,尽管我没有为offset.storage指定KafkaOffsetBackingStore

参考:DebeziumEngine Config

配置

Configuration config = Configuration.create()
.with("name", "oracle_debezium_connector")
.with("connector.class", "io.debezium.connector.oracle.OracleConnector")
.with("offset.storage", "org.apache.kafka.connect.storage.FileOffsetBackingStore")
.with("offset.storage.file.filename", "/Users/dk/Documents/work/ACET/offset.dat") 
.with("offset.flush.interval.ms", 2000)
.with("database.hostname", "localhost") 
.with("database.port", "1521")
.with("database.user", "pravin")
.with("database.password", "*****")
.with("database.sid", "ORCLCDB")
.with("database.server.name", "mServer")
.with("database.out.server.name", "dbzxout")
.with("database.history", "io.debezium.relational.history.FileDatabaseHistory")
.with("database.history.file.filename", "/Users/dk/Documents/work/ACET/dbhistory.dat")          
.with("topic.prefix","cycowner")
.with("database.dbname", "ORCLCDB")
.build();

DebeziumEngine

DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class)
.using(config.asProperties())
.using(connectorCallback)
.using(completionCallback)
.notifying(record -> {
System.out.println(record);
})
.build();

错误:

2022-10-29T16:06:16,457 ERROR [pool-2-thread-1] i.d.c.Configuration: The 'schema.history.internal.kafka.topic' value is invalid: A value is required
2022-10-29T16:06:16,457 ERROR [pool-2-thread-1] i.d.c.Configuration: The 'schema.history.internal.kafka.bootstrap.servers' value is invalid: A value is required**
2022-10-29T16:06:16,458 INFO  [pool-2-thread-1] i.d.c.c.BaseSourceTask: Stopping down connector
2022-10-29T16:06:16,463 INFO  [pool-3-thread-1] i.d.j.JdbcConnection: Connection gracefully closed
2022-10-29T16:06:16,465 INFO  [pool-2-thread-1] o.a.k.c.s.FileOffsetBackingStore: Stopped FileOffsetBackingStore
connector stopped successfully
---------------------------------------------------
success status: false, message : Unable to initialize and start connector's task class 'io.debezium.connector.oracle.OracleConnectorTask' with config: {connector.class=io.debezium.connector.oracle.OracleConnector, database.history.file.filename=/Users/dkuma416/Documents/work/ACET/dbhistory.dat, database.user=pravin, database.dbname=ORCLCDB, offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore, database.server.name=mServer, offset.flush.timeout.ms=5000, errors.retry.delay.max.ms=10000, database.port=1521, database.sid=ORCLCDB, offset.flush.interval.ms=2000, topic.prefix=cycowner, offset.storage.file.filename=/Users/dkuma416/Documents/work/ACET/offset.dat, errors.max.retries=-1, database.hostname=localhost, database.password=********, name=oracle_debezium_connector, database.out.server.name=dbzxout, errors.retry.delay.initial.ms=300, value.converter=org.apache.kafka.connect.json.JsonConverter, key.converter=org.apache.kafka.connect.json.JsonConverter, database.history=io.debezium.relational.history.MemoryDatabaseHistory}, **Error: Error configuring an instance of KafkaSchemaHistory; check the logs for details**

由于某些原因,您应该提供topic.prefixschema.history.internalschema.history.internal.file.filename属性,正如他们的文档中所提到的那样

例如:

.with("topic.prefix", "topic")
.with("schema.history.internal", "io.debezium.storage.file.history.FileSchemaHistory")
.with("schema.history.internal.file.filename", "/Users/dk/Documents/work/ACET/schistory.dat")

相关内容

  • 没有找到相关文章

最新更新