everyone, 我想在 StreamTableEnvironment 中使用 flink 时间窗口。
我以前使用过 timeWindow(Time.seconds((( 函数和一个来自 kafka 主题的 dataStream。 对于外部问题,我正在将此数据流转换为DataTable并使用sqlQuery((应用SQL查询。
我想用SQL做x个第二次时间窗口聚合,然后将其发送到另一个kafka主题
数据来源:
val stream = senv
.addSource(new FlinkKafkaConsumer[String]("flink", new SimpleStringSchema(), properties))
以前的聚合示例:
val windowCounts = stream.keyBy("x").timeWindow(Time.seconds(5), Time.seconds(5))
当前数据表:
val tableA = tableEnv.fromDataStream(parsed, 'user, 'product, 'amount)
在这一部分中,应该有一个查询,该查询每 X 次进行一次聚合
val result = tableEnv.sqlQuery(
s"SELECT * FROM $tableA WHERE amount > 2".stripMargin)
或多或少我的聚合将是计数(Y(超过(除以X( 谢谢!
Ververica对Flink SQL的训练将帮助你解决这个问题。在 中包括一些练习/示例,这些练习/示例在使用 SQL 查询动态表一节中仅涵盖此类查询。
您必须为每个事件建立计时信息的来源,可以是处理时间或事件时间,之后对应于stream.keyBy("x").timeWindow(Time.seconds(5), Time.seconds(5))
的查询将如下所示:
SELECT
x,
TUMBLE_END(timestamp, INTERVAL '5' SECOND) AS t,
COUNT(*) AS cnt
FROM Events
GROUP BY
x, TUMBLE(timestamp, INTERVAL '5' SECOND);
有关如何使用时间属性的详细信息,请参阅时间属性简介。
有关使用 Flink SQL 进行窗口的更详细文档,请参阅 Group Windows 上的文档。