我看到了将FlinkTable
对象转换为DataStream
并运行StreamExecutionEnvironment.execute
的示例。
我将如何编码+运行一个连续查询,该查询使用表API写入流接收器而不转换为DataStream。
这似乎是可能的,因为否则指定流式接收器表连接器的目的是什么?
表API文档列出了连续查询和动态表,但大多数实际的JavaAPI和代码示例似乎只使用表API进行批处理。
EDIT:为了向David Anderson展示我正在尝试的内容,下面是在类似的Derby SQL表之上的三个Flink SQL CREATE TABLE语句。
我看到JDBC表连接器接收器支持流,但我没有正确配置吗?我没有看到任何我俯瞰的东西。https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/connectors/jdbc.html
仅供参考,当我的玩具示例开始工作时,我计划在生产中使用Kafka来处理输入/输出流之类的数据,并使用JDBC/SQL来处理查找表。
CREATE TABLE LookupTableFlink (
`lookup_key` STRING NOT NULL,
`lookup_value` STRING NOT NULL,
PRIMARY KEY (lookup_key) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:derby:memory:myDB;create=false',
'table-name' = 'LookupTable'
),
CREATE TABLE IncomingEventsFlink (
`field_to_use_as_lookup_key` STRING NOT NULL,
`extra_field` INTEGER NOT NULL,
`proctime` AS PROCTIME()
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:derby:memory:myDB;create=false',
'table-name' = 'IncomingEvents'
), jdbcUrl);
CREATE TABLE TransformedEventsFlink (
`field_to_use_as_lookup_key` STRING,
`extra_field` INTEGER,
`lookup_key` STRING,
`lookup_value` STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:derby:memory:myDB;create=false',
'table-name' = 'TransformedEvents'
), jdbcUrl);
String sqlQuery =
"SELECTn" +
" IncomingEventsFlink.field_to_use_as_lookup_key, IncomingEventsFlink.extra_field,n" +
" LookupTableFlink.lookup_key, LookupTableFlink.lookup_valuen" +
"FROM IncomingEventsFlinkn" +
"LEFT JOIN LookupTableFlink FOR SYSTEM_TIME AS OF IncomingEventsFlink.proctimen" +
"ON (IncomingEventsFlink.field_to_use_as_lookup_key = LookupTableFlink.lookup_key)n";
Table joinQuery = tableEnv.sqlQuery(sqlQuery);
// This seems to run, return, and complete and doesn't seem to run in continuous/streaming mode.
TableResult tableResult = joinQuery.executeInsert(TransformedEventsFlink);
您可以使用executeInsert
写入动态表,如
Table orders = tableEnv.from("Orders");
orders.executeInsert("OutOrders");
文档在这里。
这里有解释。
代码示例可以在这里找到:
// get StreamTableEnvironment.
StreamTableEnvironment tableEnv = ...; // see "Create a TableEnvironment" section
// Table with two fields (String name, Integer age)
Table table = ...
// convert the Table into an append DataStream of Row by specifying the class
DataStream<Row> dsRow = tableEnv.toAppendStream(table, Row.class);
// convert the Table into an append DataStream of Tuple2<String, Integer>
// via a TypeInformation
TupleTypeInfo<Tuple2<String, Integer>> tupleType = new TupleTypeInfo<>(
Types.STRING(),
Types.INT());
DataStream<Tuple2<String, Integer>> dsTuple =
tableEnv.toAppendStream(table, tupleType);
// convert the Table into a retract DataStream of Row.
// A retract stream of type X is a DataStream<Tuple2<Boolean, X>>.
// The boolean field indicates the type of the change.
// True is INSERT, false is DELETE.
DataStream<Tuple2<Boolean, Row>> retractStream =
tableEnv.toRetractStream(table, Row.class);