如何在 Apache Flink 的 StreamTableEnvironment 中实现 timeWindow()?



everyone, 我想在 StreamTableEnvironment 中使用 flink 时间窗口。

我以前使用过 timeWindow(Time.seconds((( 函数和一个来自 kafka 主题的 dataStream。 对于外部问题,我正在将此数据流转换为DataTable并使用sqlQuery((应用SQL查询。

我想用SQL做x个第二次时间窗口聚合,然后将其发送到另一个kafka主题

数据来源:

val stream = senv
.addSource(new FlinkKafkaConsumer[String]("flink", new SimpleStringSchema(), properties))

以前的聚合示例:

val windowCounts = stream.keyBy("x").timeWindow(Time.seconds(5), Time.seconds(5))

当前数据表:

val tableA = tableEnv.fromDataStream(parsed, 'user, 'product, 'amount)

在这一部分中,应该有一个查询,该查询每 X 次进行一次聚合

val result = tableEnv.sqlQuery(
s"SELECT * FROM $tableA WHERE amount > 2".stripMargin)

或多或少我的聚合将是计数(Y(超过(除以X( 谢谢!

Ververica对Flink SQL的训练将帮助你解决这个问题。在 中包括一些练习/示例,这些练习/示例在使用 SQL 查询动态表一节中仅涵盖此类查询。

您必须为每个事件建立计时信息的来源,可以是处理时间或事件时间,之后对应于stream.keyBy("x").timeWindow(Time.seconds(5), Time.seconds(5))的查询将如下所示:

SELECT
x,
TUMBLE_END(timestamp, INTERVAL '5' SECOND) AS t,
COUNT(*) AS cnt
FROM Events
GROUP BY
x, TUMBLE(timestamp, INTERVAL '5' SECOND);

有关如何使用时间属性的详细信息,请参阅时间属性简介。

有关使用 Flink SQL 进行窗口的更详细文档,请参阅 Group Windows 上的文档。

相关内容

  • 没有找到相关文章

最新更新