我正在尝试将 Flink 表转换为缩回水槽,然后将其连接到水槽中。我能够在原始的表格规划器中使用CRow
来做到这一点,但似乎 Flink 的 Blink 规划器不再支持CRow
。有没有办法在使用眨眼规划器时完成此操作?
作为参考,我们之前能够通过将缩回流映射到CRow
类型来做到这一点,然后再将其连接到RetractStreamTableSink
。
下面是我试图完成的单元测试示例,请注意注释掉的代码块在旧规划器中正常工作。
这失败并出现以下异常,这是有道理的,因为缩回流是Tuple2<Boolean, Row>
型,而 Sink 是Row
型,但我看不到将Tuple2
缩回DataStream
与RetractStreamTableSink<Row>
一起使用
org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink default_catalog.default_database.sink2 do not match.
Query schema: [f0: BOOLEAN, f1: ROW<`f0` STRING, `f1` STRING>]
Sink schema: [f0: STRING, f1: STRING]
@Test
public void retractStream() throws Exception {
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.createLocalEnvironment();
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(executionEnvironment, settings);
Row row1 = new Row(2);
row1.setField(0, "1");
row1.setField(1, "2");
SingleOutputStreamOperator<Row> source =
executionEnvironment.fromCollection(ImmutableList.of(row1)).setParallelism(1);
tableEnvironment.createTemporaryView("table1", source, "key, id");
Table outputTable = tableEnvironment.sqlQuery("select key, id from table1");
RowTypeInfo rowTypeInfo = new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
// This code block below works on Flink planner but fails on Blink planner because Blink treats all non-tuples
// as POJOs
// SingleOutputStreamOperator<?> tuple2DataStream = tableEnvironment
// .toRetractStream(outputTable, rowTypeInfo)
// .map(value -> new CRow(value.f1, value.f0))
// .returns(new CRowTypeInfo(rowTypeInfo));
// Create the retracting stream
DataStream<Tuple2<Boolean, Row>> tuple2DataStream = tableEnvironment.toRetractStream(outputTable, rowTypeInfo);
tableEnvironment.createTemporaryView("outputTable", tuple2DataStream);
// Create a sink
TableSchema schema = new TableSchema(rowTypeInfo.getFieldNames(), rowTypeInfo.getFieldTypes());
CollectingTableSink collectingTableSink = new CollectingTableSink(schema);
RetractSink retractTableSink = new RetractSink(collectingTableSink);
tableEnvironment.registerTableSink("sink2", retractTableSink);
// Wire up the table and the sink (this is what fails)
tableEnvironment.from("outputTable").insertInto("sink2");
executionEnvironment.execute();
System.out.println(collectingTableSink.rows);
}
所以我找到了这个问题的解决方法,如果你AppendStreamTableSink<Tuple2<Boolean, Row>>
制作一个填充接口并让它实现RetractStreamTableSink
默认值的方法,并覆盖consumeDataStream
方法,如下所示,你可以从 Tuple2 返回到 Row,而无需CRow
。
这正是RetractStreamTableSink
的目的,但是有些东西会导致 Blink 在使用时失败(即使在AppendStreamTableSink
和RetractStreamTableSink
都相同的情况下(所有方法都被覆盖并相等,唯一的区别是您实现的接口的名称(。我强烈怀疑这是Blink计划器中的一个错误,但无法确定它来自哪里。
执行转换的代码段:
@Override
public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
DataStream<Row> filteredAndMapped =
dataStream.filter(x -> x.f0).map(x -> x.f1).returns(delegate.getOutputType());
return delegate.consumeDataStream(filteredAndMapped);
}