如何将Debezium连接器与Apache Flink一起使用



我正在尝试创建一个使用Debezium源函数的表whit-flink的表API,我在这里找到了这些函数的实现https://github.com/ververica/flink-cdc-connectors,并在我的代码上这样使用它们:

val debeziumProperties = new Properties()
debeziumProperties.setProperty("plugin.name", "wal2json")
debeziumProperties.setProperty("format", "debezium-json")
val sourceFunction: DebeziumSourceFunction[TestCharge] = PostgreSQLSource.builder()
.hostname("******")
.port(5432)
.database("*****") // monitor all tables under inventory database
.username("*****")
.password("*****")
.debeziumProperties(debeziumProperties)
.deserializer(new CustomDebeziumDeserializer) // converts SourceRecord to String
.build()
val env = StreamExecutionEnvironment.getExecutionEnvironment
val sSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val sTableEnv = StreamTableEnvironment.create(env, sSettings)
val cdcStream: DataStream[TestCharge] = env
.addSource(sourceFunction)
.map(x => x)
sTableEnv.createTemporaryView("historic", cdcStream, 'chargeId, 'email, 'amount, 'cardHash)
val table: Table = sTableEnv.sqlQuery("SELECT SUM(amount) FROM historic GROUP BY chargeId")
val reverse = sTableEnv.toRetractStream[Row](table)
reverse.print()

我还添加了文档中描述的依赖项:

"com.alibaba.ververica" % "flink-sql-connector-postgres-cdc" % "1.1.0"

当我尝试在迷你集群上本地运行我的作业时,它运行得很好,但在Kubernetes上提供的Flink集群中,它给了我这个例外:

Caused by: io.debezium.DebeziumException: No implementation of Debezium engine builder was found

有人知道会发生什么吗?或者我是否失去了一些依赖?

提前谢谢。

如果您想在TableAPI/SQL中使用它,您可以使用SQL DDL注册该表。

sTableEnv.executeSql(
"""
|CREATE TABLE shipments (
|  shipment_id INT,
|  order_id INT,
|  origin STRING,
|  destination STRING,
|  is_arrived BOOLEAN
|) WITH (
|  'connector' = 'postgres-cdc',
|  'hostname' = 'localhost',
|  'port' = '5432',
|  'username' = 'postgres',
|  'password' = 'postgres',
|  'database-name' = 'postgres',
|  'schema-name' = 'public',
|  'table-name' = 'shipments'
|)
|""".stripMargin)
// then you can query the table
val table: Table = sTableEnv.sqlQuery("SELECT SUM(shipment_id) FROM shipments GROUP BY order_id")

这是与美国疾病控制与预防中心合作的最简单方法。因为目前API表不支持将变更日志流转换为Table

关于你的问题,我认为这可能是因为依赖冲突。请检查您是否依赖另一个版本的<artifactId>debezium-embedded</artifactId>。如果是,请将其删除。flink-sql-connector-postgres-cdc已将其打包为1.12版本。

相关内容

  • 没有找到相关文章

最新更新