flink表API不处理记录



我从Kafka读取json数据,并尝试使用flink table API处理数据。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(
"create table inputTable(" +
"`src_ip` STRING," +
"`src_port` STRING," +
"`bytes_from_src` BIGINT," +
"`pkts_from_src` BIGINT," +
"`ts` TIMESTAMP(2) METADATA FROM 'timestamp'," +
"WATERMARK FOR ts AS ts" +
") WITH (" +
"'connector' = 'kafka'," +
"'topic' = 'test'," +
"'properties.bootstrap.servers' = 'localhost:9092'," +
"'properties.group.id' = 'testGroup'," +
"'scan.startup.mode' = 'earliest-offset'," +
"'format' = 'json'," +
"'json.fail-on-missing-field' = 'true'," +
"'json.ignore-parse-errors' = 'false'" +
")");
Table inputTable = tEnv.from("inputTable");
inputTable.printSchema();
inputTable.execute().print();
Table windowedTable = inputTable
.window(Tumble.over(lit(5).seconds()).on($("ts")).as("w"))
.groupBy($("w"), $("src_ip"))
.select($("w").start().as("window_start"),
$("src_ip"),
$("src_ip").count().as("src_ip_count"),                         
$("bytes_from_src").avg().as("bytes_from_src_mean")                     
);
windowedTable.execute().print();

Kafka中有4条记录。flink程序打印出模式信息和输入表,如下所示:

Connected to the target VM, address: '127.0.0.1:62348', transport: 'socket'
(
`src_ip` STRING,
`src_port` STRING,
`bytes_from_src` BIGINT,
`pkts_from_src` BIGINT,
`ts` TIMESTAMP(2) *ROWTIME* METADATA FROM 'timestamp',
WATERMARK FOR `ts`: TIMESTAMP(2) AS `ts`
)
+----+--------------------------------+--------------------------------+----------------------+----------------------+-------------------------+
| op |                         src_ip |                       src_port |       bytes_from_src |        pkts_from_src |                      ts |
+----+--------------------------------+--------------------------------+----------------------+----------------------+-------------------------+
| +I |                     44.38.5.31 |                          53159 |                  120 |                    3 |  2021-08-13 14:59:56.00 |
| +I |                   44.38.132.51 |                          39409 |                  100 |                    2 |  2021-08-13 14:58:11.00 |
| +I |                     44.38.4.44 |                          56758 |                  336 |                    6 |  2021-08-13 14:59:14.00 |
| +I |                     44.38.5.34 |                          40001 |                   80 |                    2 |  2021-08-13 14:57:04.00 |

之后,不打印任何内容。程序没有退出。我在IDEA中运行链接。在这一点上,这似乎是一个黑盒子。没有输出,我不知道如何跟踪flink程序。

如果我注释掉inputTable.execute().print();行,模式信息将被打印出来,但之后什么也没有,程序也不会退出。

使用的flink版本为1.14.2。

我相信这些记录正在被处理,并且正在被添加到窗口中。但是事件时间窗是由水印触发的,而水印还没有大到足以触发时间窗。要使其工作,您需要处理时间戳超过窗口末尾的事件——即,2021-08-13 15:00:00.00或更大。

对于调试,Flink web仪表板在这种情况下很有帮助。您可以查看事件是否正在被处理,检查水印等。从IDE运行时,请参阅Flink web以获取设置帮助。

相关内容

  • 没有找到相关文章

最新更新