我使用Flink表API将数据从kineesis主题拉到表中。我希望定期将这些数据拉到临时表中,并在其上运行自定义标量函数。然而,我注意到我的标量函数根本没有被调用。
下面是Kinesis表的代码:this.tableEnv.executeSql("CREATE TABLE transactions (n" +
" entry STRING,n" +
" sequence_number VARCHAR(128) NOT NULL METADATA FROM 'sequence-number' VIRTUAL,n" +
" shard_id VARCHAR(128) NOT NULL METADATA FROM 'shard-id' VIRTUAL,n" +
" arrival_time TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL,n" +
" WATERMARK FOR arrival_time AS arrival_time - INTERVAL '5' SECONDn" +
") WITH (n" +
" 'connector' = 'kinesis',n" +
" 'stream' = '" + streamName + "',n" +
" 'aws.region' = 'us-west-2', n" +
" 'format' = 'raw'n" +
")");
然后,我想每秒周期性地调用一个tumble,它从kines中提取数据并更新一个临时表。
我的临时表定义如下:
this.tableEnv.executeSql("CREATE TABLE temporaryTable (n" +
" entry STRING,n" +
" sequence_number VARCHAR(128) NOT NULL,n" +
" shard_id VARCHAR(128) NOT NULL,n" +
" arrival_time TIMESTAMP(3),n" +
" record_list STRING NOT NULL,n" +
" PRIMARY KEY (shard_id, sequence_number) NOT ENFORCED" +
") WITH (n" +
" 'connector' = 'print'n" +
")");
然后我有一个代码来做翻滚:
Table inMemoryTable = transactions.
window(Tumble.over(lit(1).second()).on($("arrival_time")).as("log_ts"))
.groupBy($("entry"), $("sequence_number"), $("log_ts"), $("shard_id"), $("arrival_time"))
.select(
$("entry"),
$("sequence_number"), $("shard_id"), $("arrival_time"),
(call(CustomFunction.class, $("entry")).as("record_list")));
inMemoryTable.executeInsert(temporaryTable)
CustomFunction类是这样的:
public class CustomFunction extends ScalarFunction {
@DataTypeHint("STRING")
public String eval(
@DataTypeHint("STRING") String serializedEntry) throws IOException {
return "asd";
}
当我在Flink中运行这段代码时,我在标准输出中没有得到任何东西,所以显然我错过了一些东西。
下面是Flink UI:
图片作为链接,因为我没有足够的代表
谢谢你的帮助。
我可以让流打印:
driver.tableEnv.getConfig().getConfiguration().setString("table.exec.source.idle", "10000 ms");
driver.env.getConfig().setAutoWatermarkInterval(5000);