我确实配置了独立的Debezium并测试了流媒体。之后,我创建了一个管道如下
pipeline.apply("Read from DebeziumIO",
DebeziumIO.<String>read()
.withConnectorConfiguration(
DebeziumIO.ConnectorConfiguration.create()
.withUsername("user")
.withPassword("password")
.withHostName("hostname")
.withPort("1433")
.withConnectorClass(SqlServerConnector.class)
.withConnectionProperty("database.server.name", "customer")
.withConnectionProperty("database.dbname", "test001")
.withConnectionProperty("database.include.list", "test002")
.withConnectionProperty("include.schema.changes", "true")
.withConnectionProperty("database.history.kafka.bootstrap.servers", "kafka:9092")
.withConnectionProperty("database.history.kafka.topic", "schema-changes.inventory")
.withConnectionProperty("connect.keep.alive", "false")
.withConnectionProperty("connect.keep.alive.interval.ms", "200")
).withFormatFunction(new SourceRecordJson.SourceRecordJsonMapper()).withCoder(StringUtf8Coder.of())
)
当我使用DirectRunner启动管道时,管道不会捕获数据流。在我的管道代码中,我只是暂时添加了将数据转储到控制台的代码。
此外,从日志中,我观察到Debezium正在频繁地启动和停止。这是故意的吗?此外,当数据库发生更改(INSERT/DELETE/UPDATE(时,我发现它不会反映在日志中。
所以我的问题是,
- 我提供的配置是否足够
- 为什么在发生更改时未触发管道
- 我还需要执行哪些步骤才能使其正常工作
多次重新启动debezium会对性能造成影响。因为它创建了一个jdbc连接。