如何即时将模式连接到flink DataStream



我正在处理数据库突变流,即更改日志流。我希望能够使用SQL查询来转换值。我很难将以下三个概念汇总在一起 RowTypeInfoRowDataStream

注意:我不知道该架构。我使用Mutation对象中的数据(Mutation是自定义类型(构建它

更具体地说,我有看起来像这样的代码。

val execEnv = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv: StreamTableEnvironment = TableEnvironment.getTableEnvironment(execEnv)
// Mutation is a custom type
val mutationStream: DataStream[Mutation] = ...
// toRows returns an object of type org.apache.flink.types.Row
val rowStream:DataStream[Row] = mutationStream.flatMap({mutation => toRows(mutation)})
tableEnv.registerDataStream("spinal_tap_table", rowStream)
tableEnv.sql("select col1 + 2")

注意:Row对象是位置,并且没有列名称的占位符。我找不到将架构连接到DataStream对象的地方。

我想传递某种类似于Row的结构,该结构包含查询的完整信息{columnName: String, columnValue: Object, columnType: TypeInformation[_]}

在Flink SQL中,当Table定义时,表格架构是必不可少的。不可能在动态键入记录上运行查询。

关于RowTypeInfoRowDataStream的概念:

  • Row是保存数据的实际记录
  • RowTypeInfoRow s的架构描述。它包含Row的每个字段的名称和TypeInformation
  • DataStream是逻辑记录流。DataStream[Row]是一排。请注意,这不是实际流,而只是代表API中的流的API概念。

相关内容

  • 没有找到相关文章

最新更新