我从Kafka读取json数据,并尝试使用flink table API处理数据。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
tEnv.executeSql(
"create table inputTable(" +
"`src_ip` STRING," +
"`src_port` STRING," +
"`bytes_from_src` BIGINT," +
"`pkts_from_src` BIGINT," +
"`ts` TIMESTAMP(2) METADATA FROM 'timestamp'," +
"WATERMARK FOR ts AS ts" +
") WITH (" +
"'connector' = 'kafka'," +
"'topic' = 'test'," +
"'properties.bootstrap.servers' = 'localhost:9092'," +
"'properties.group.id' = 'testGroup'," +
"'scan.startup.mode' = 'earliest-offset'," +
"'format' = 'json'," +
"'json.fail-on-missing-field' = 'true'," +
"'json.ignore-parse-errors' = 'false'" +
")");
Table inputTable = tEnv.from("inputTable");
inputTable.printSchema();
inputTable.execute().print();
Table windowedTable = inputTable
.window(Tumble.over(lit(5).seconds()).on($("ts")).as("w"))
.groupBy($("w"), $("src_ip"))
.select($("w").start().as("window_start"),
$("src_ip"),
$("src_ip").count().as("src_ip_count"),
$("bytes_from_src").avg().as("bytes_from_src_mean")
);
windowedTable.execute().print();
Kafka中有4条记录。flink程序打印出模式信息和输入表,如下所示:
Connected to the target VM, address: '127.0.0.1:62348', transport: 'socket'
(
`src_ip` STRING,
`src_port` STRING,
`bytes_from_src` BIGINT,
`pkts_from_src` BIGINT,
`ts` TIMESTAMP(2) *ROWTIME* METADATA FROM 'timestamp',
WATERMARK FOR `ts`: TIMESTAMP(2) AS `ts`
)
+----+--------------------------------+--------------------------------+----------------------+----------------------+-------------------------+
| op | src_ip | src_port | bytes_from_src | pkts_from_src | ts |
+----+--------------------------------+--------------------------------+----------------------+----------------------+-------------------------+
| +I | 44.38.5.31 | 53159 | 120 | 3 | 2021-08-13 14:59:56.00 |
| +I | 44.38.132.51 | 39409 | 100 | 2 | 2021-08-13 14:58:11.00 |
| +I | 44.38.4.44 | 56758 | 336 | 6 | 2021-08-13 14:59:14.00 |
| +I | 44.38.5.34 | 40001 | 80 | 2 | 2021-08-13 14:57:04.00 |
之后,不打印任何内容。程序没有退出。我在IDEA中运行链接。在这一点上,这似乎是一个黑盒子。没有输出,我不知道如何跟踪flink程序。
如果我注释掉inputTable.execute().print();
行,模式信息将被打印出来,但之后什么也没有,程序也不会退出。
使用的flink版本为1.14.2。
我相信这些记录正在被处理,并且正在被添加到窗口中。但是事件时间窗是由水印触发的,而水印还没有大到足以触发时间窗。要使其工作,您需要处理时间戳超过窗口末尾的事件——即,2021-08-13 15:00:00.00或更大。
对于调试,Flink web仪表板在这种情况下很有帮助。您可以查看事件是否正在被处理,检查水印等。从IDE运行时,请参阅Flink web以获取设置帮助。