使用SQL Server读取的DebeziumIO未使用GCP中的Apache beam进行流式传输



我确实配置了独立的Debezium并测试了流媒体。之后,我创建了一个管道如下

pipeline.apply("Read from DebeziumIO",
DebeziumIO.<String>read()
.withConnectorConfiguration(
DebeziumIO.ConnectorConfiguration.create()
.withUsername("user")
.withPassword("password")
.withHostName("hostname")
.withPort("1433")
.withConnectorClass(SqlServerConnector.class)
.withConnectionProperty("database.server.name", "customer")
.withConnectionProperty("database.dbname", "test001")
.withConnectionProperty("database.include.list", "test002")
.withConnectionProperty("include.schema.changes", "true")
.withConnectionProperty("database.history.kafka.bootstrap.servers", "kafka:9092")  
.withConnectionProperty("database.history.kafka.topic", "schema-changes.inventory") 
.withConnectionProperty("connect.keep.alive", "false")               
.withConnectionProperty("connect.keep.alive.interval.ms", "200")
).withFormatFunction(new SourceRecordJson.SourceRecordJsonMapper()).withCoder(StringUtf8Coder.of())
)

当我使用DirectRunner启动管道时,管道不会捕获数据流。在我的管道代码中,我只是暂时添加了将数据转储到控制台的代码。

此外,从日志中,我观察到Debezium正在频繁地启动和停止。这是故意的吗?此外,当数据库发生更改(INSERT/DELETE/UPDATE(时,我发现它不会反映在日志中。

所以我的问题是,

  1. 我提供的配置是否足够
  2. 为什么在发生更改时未触发管道
  3. 我还需要执行哪些步骤才能使其正常工作

多次重新启动debezium会对性能造成影响。因为它创建了一个jdbc连接。

相关内容

  • 没有找到相关文章

最新更新