Flink SQL:使用更新日志流更新动态表中的行



我有一个包含JSON消息的流,如下所示:

{"operation":"CREATE","data":{"id":"id-1", "value":"value-1"}}
{"operation":"CREATE","data":{"id":"id-2", "value":"value-2"}}
{"operation":"DELETE","data":{"id":"id-1"}}
{"operation":"UPDATE","data":{"id":"id-2", "value":"value-3"}}

此流在注册为TableSourceDataStream<Row>中处理。

我想将此流用作更新日志流来更新Flink 表的内容,但我找不到一种方法来做到这一点。

我将StreamTableSource定义为:

public class MyTableSource implements StreamTableSource<Row>, ... {
@Override
public DataStream<Row> getDataStream(final StreamExecutionEnvironment env) {
DataStream<Row> stream = getDataStream(env) // Retrieve changelog stream 
.keyBy([SOME KEY])                  // Aggregate by key 
.map(new MyMapFunction());          // Map the update message with the correct encoding ?
return stream;
}
... 
}

而这个TableSource用于

public void process(final StreamExecutionEnvironment env) {
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.registerTableSource("MyTableSource", new MyTableSource());
Table result = tableEnv.sqlQuery("SELECT * FROM MyTableSource"); // This table content should be updated according to operation described in the changelog stream.
result.insertInto([SOME SINK]);
}

这样做的好方法是什么?(更具体地说,如何使用流从表中删除行?

目前,内部更改日志处理功能不通过 API 公开。因此,没有可用的源代码允许您将传入的更改日志解释为表。这是计划在 Flink 1.11 中实现的。

在此之前,您可以考虑使用用户定义的聚合函数来应用此处建议的更新:

Apache Flink:如何为动态表启用"upsert 模式"?

相关内容

  • 没有找到相关文章

最新更新