未调用Flink表打印连接器



我使用Flink表API将数据从kineesis主题拉到表中。我希望定期将这些数据拉到临时表中,并在其上运行自定义标量函数。然而,我注意到我的标量函数根本没有被调用。

下面是Kinesis表的代码:
this.tableEnv.executeSql("CREATE TABLE transactions (n" +
"    entry  STRING,n" +
"    sequence_number VARCHAR(128) NOT NULL METADATA FROM 'sequence-number' VIRTUAL,n" +
"    shard_id VARCHAR(128) NOT NULL METADATA FROM 'shard-id' VIRTUAL,n" +
"    arrival_time TIMESTAMP(3) METADATA FROM 'timestamp' VIRTUAL,n" +
"    WATERMARK FOR arrival_time AS arrival_time - INTERVAL '5' SECONDn" +
") WITH (n" +
"    'connector' = 'kinesis',n" +
"    'stream'     = '" + streamName + "',n" +
"    'aws.region' = 'us-west-2', n" +
"    'format'    = 'raw'n" +
")");

然后,我想每秒周期性地调用一个tumble,它从kines中提取数据并更新一个临时表。

我的临时表定义如下:

this.tableEnv.executeSql("CREATE TABLE temporaryTable (n" +
"    entry STRING,n" +
"    sequence_number VARCHAR(128) NOT NULL,n" +
"    shard_id VARCHAR(128) NOT NULL,n" +
"    arrival_time     TIMESTAMP(3),n" +
"    record_list STRING NOT NULL,n" +
"    PRIMARY KEY (shard_id, sequence_number) NOT ENFORCED" +
") WITH (n" +
"   'connector'  = 'print'n" +
")");

然后我有一个代码来做翻滚:

Table inMemoryTable = transactions.
window(Tumble.over(lit(1).second()).on($("arrival_time")).as("log_ts")) 
.groupBy($("entry"), $("sequence_number"), $("log_ts"), $("shard_id"), $("arrival_time"))
.select(
$("entry"),
$("sequence_number"), $("shard_id"), $("arrival_time"),
(call(CustomFunction.class, $("entry")).as("record_list")));
inMemoryTable.executeInsert(temporaryTable)

CustomFunction类是这样的:

public class CustomFunction extends ScalarFunction {
@DataTypeHint("STRING")
public String eval(
@DataTypeHint("STRING") String serializedEntry) throws IOException {
return "asd";
}

当我在Flink中运行这段代码时,我在标准输出中没有得到任何东西,所以显然我错过了一些东西。

下面是Flink UI:

图片作为链接,因为我没有足够的代表

谢谢你的帮助。

我可以让流打印:

driver.tableEnv.getConfig().getConfiguration().setString("table.exec.source.idle", "10000 ms");
driver.env.getConfig().setAutoWatermarkInterval(5000);

最新更新