Flink 表 API -> Streaming Sink?



我看到了将FlinkTable对象转换为DataStream并运行StreamExecutionEnvironment.execute的示例。

我将如何编码+运行一个连续查询,该查询使用表API写入流接收器而不转换为DataStream。

这似乎是可能的,因为否则指定流式接收器表连接器的目的是什么?

表API文档列出了连续查询和动态表,但大多数实际的JavaAPI和代码示例似乎只使用表API进行批处理。

EDIT:为了向David Anderson展示我正在尝试的内容,下面是在类似的Derby SQL表之上的三个Flink SQL CREATE TABLE语句。

我看到JDBC表连接器接收器支持流,但我没有正确配置吗?我没有看到任何我俯瞰的东西。https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html

仅供参考,当我的玩具示例开始工作时,我计划在生产中使用Kafka来处理输入/输出流之类的数据,并使用JDBC/SQL来处理查找表。

CREATE TABLE LookupTableFlink (
`lookup_key` STRING NOT NULL,
`lookup_value` STRING NOT NULL,
PRIMARY KEY (lookup_key) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:derby:memory:myDB;create=false',
'table-name' = 'LookupTable'
),
CREATE TABLE IncomingEventsFlink (
`field_to_use_as_lookup_key` STRING NOT NULL,
`extra_field` INTEGER NOT NULL,
`proctime` AS PROCTIME()
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:derby:memory:myDB;create=false',
'table-name' = 'IncomingEvents'
), jdbcUrl);
CREATE TABLE TransformedEventsFlink (
`field_to_use_as_lookup_key` STRING,
`extra_field` INTEGER,
`lookup_key` STRING,
`lookup_value` STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:derby:memory:myDB;create=false',
'table-name' = 'TransformedEvents'
), jdbcUrl);
String sqlQuery =
"SELECTn" +
"  IncomingEventsFlink.field_to_use_as_lookup_key, IncomingEventsFlink.extra_field,n" +
"  LookupTableFlink.lookup_key, LookupTableFlink.lookup_valuen" +
"FROM IncomingEventsFlinkn" +
"LEFT JOIN LookupTableFlink FOR SYSTEM_TIME AS OF IncomingEventsFlink.proctimen" +
"ON (IncomingEventsFlink.field_to_use_as_lookup_key = LookupTableFlink.lookup_key)n";
Table joinQuery = tableEnv.sqlQuery(sqlQuery);
// This seems to run, return, and complete and doesn't seem to run in continuous/streaming mode.
TableResult tableResult = joinQuery.executeInsert(TransformedEventsFlink);

您可以使用executeInsert写入动态表,如

Table orders = tableEnv.from("Orders");
orders.executeInsert("OutOrders");

文档在这里。

这里有解释。

代码示例可以在这里找到:

// get StreamTableEnvironment. 
StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// Table with two fields (String name, Integer age)
Table table = ...
// convert the Table into an append DataStream of Row by specifying the class
DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class);
// convert the Table into an append DataStream of Tuple2<String, Integer> 
//   via a TypeInformation
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
Types.STRING(),
Types.INT());
DataStream<Tuple2<String, Integer>> dsTuple = 
tableEnv.toAppendStream(table, tupleType);
// convert the Table into a retract DataStream of Row.
//   A retract stream of type X is a DataStream<Tuple2<Boolean, X>>. 
//   The boolean field indicates the type of the change. 
//   True is INSERT, false is DELETE.
DataStream<Tuple2<Boolean, Row>> retractStream = 
tableEnv.toRetractStream(table, Row.class);

最新更新