如何使用眨眼计划器从缩回流插入缩回水槽



我正在尝试将 Flink 表转换为缩回水槽,然后将其连接到水槽中。我能够在原始的表格规划器中使用CRow来做到这一点,但似乎 Flink 的 Blink 规划器不再支持CRow。有没有办法在使用眨眼规划器时完成此操作?

作为参考,我们之前能够通过将缩回流映射到CRow类型来做到这一点,然后再将其连接到RetractStreamTableSink

下面是我试图完成的单元测试示例,请注意注释掉的代码块在旧规划器中正常工作。

这失败并出现以下异常,这是有道理的,因为缩回流是Tuple2<Boolean, Row>型,而 Sink 是Row型,但我看不到将Tuple2缩回DataStreamRetractStreamTableSink<Row>一起使用

的方法
org.apache.flink.table.api.ValidationException: Field types of query result and registered TableSink default_catalog.default_database.sink2 do not match.
Query schema: [f0: BOOLEAN, f1: ROW<`f0` STRING, `f1` STRING>]
Sink schema: [f0: STRING, f1: STRING]
@Test
public void retractStream() throws Exception {
EnvironmentSettings settings = EnvironmentSettings.newInstance()
.useBlinkPlanner()
.inStreamingMode()
.build();
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.createLocalEnvironment();
StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(executionEnvironment, settings);
Row row1 = new Row(2);
row1.setField(0, "1");
row1.setField(1, "2");
SingleOutputStreamOperator<Row> source =
executionEnvironment.fromCollection(ImmutableList.of(row1)).setParallelism(1);
tableEnvironment.createTemporaryView("table1", source, "key, id");
Table outputTable = tableEnvironment.sqlQuery("select key, id from table1");
RowTypeInfo rowTypeInfo = new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO);
// This code block below works on Flink planner but fails on Blink planner because Blink treats all non-tuples
// as POJOs
// SingleOutputStreamOperator<?> tuple2DataStream = tableEnvironment
//         .toRetractStream(outputTable, rowTypeInfo)
//         .map(value -> new CRow(value.f1, value.f0))
//         .returns(new CRowTypeInfo(rowTypeInfo));
// Create the retracting stream
DataStream<Tuple2<Boolean, Row>> tuple2DataStream = tableEnvironment.toRetractStream(outputTable, rowTypeInfo);
tableEnvironment.createTemporaryView("outputTable", tuple2DataStream);
// Create a sink
TableSchema schema = new TableSchema(rowTypeInfo.getFieldNames(), rowTypeInfo.getFieldTypes());
CollectingTableSink collectingTableSink = new CollectingTableSink(schema);
RetractSink retractTableSink = new RetractSink(collectingTableSink);
tableEnvironment.registerTableSink("sink2", retractTableSink);
// Wire up the table and the sink (this is what fails)
tableEnvironment.from("outputTable").insertInto("sink2");
executionEnvironment.execute();
System.out.println(collectingTableSink.rows);
}

所以我找到了这个问题的解决方法,如果你AppendStreamTableSink<Tuple2<Boolean, Row>>制作一个填充接口并让它实现RetractStreamTableSink默认值的方法,并覆盖consumeDataStream方法,如下所示,你可以从 Tuple2 返回到 Row,而无需CRow

这正是RetractStreamTableSink的目的,但是有些东西会导致 Blink 在使用时失败(即使在AppendStreamTableSinkRetractStreamTableSink都相同的情况下(所有方法都被覆盖并相等,唯一的区别是您实现的接口的名称(。我强烈怀疑这是Blink计划器中的一个错误,但无法确定它来自哪里。

执行转换的代码段:

@Override
public DataStreamSink<?> consumeDataStream(DataStream<Tuple2<Boolean, Row>> dataStream) {
DataStream<Row> filteredAndMapped =
dataStream.filter(x -> x.f0).map(x -> x.f1).returns(delegate.getOutputType());
return delegate.consumeDataStream(filteredAndMapped);
}

相关内容

  • 没有找到相关文章

最新更新