我使用的是Flink 1.12.0,并有以下代码来创建表。
如果我运行以下选择,则不会产生结果
SELECT
TUMBLE_START(pt, INTERVAL '4' second),
sum(price)
FROM sourceTable
GROUP BY TUMBLE(pt, INTERVAL '4' second)
如果我运行以下选择,则结果是正确的。
tenv.sqlQuery("select key, price, pt from sourceTable").toAppendStream[Row].print()
有人能帮忙看看吗?
package org.example.sql4
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.TableResult
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.types.Row
object Sql017_ProcessTimeAttributeDDLTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val tenv = StreamTableEnvironment.create(env)
val ddl =
"""
create table sourceTable(
key STRING,
`date` STRING, ---date is a key word
price DOUBLE,
pt as PROCTIME() ---processing time
) with (
'connector' = 'filesystem',
'path' = 'd:/stock.csv',
'format' = 'csv'
)
""".stripMargin(' ')
val result: TableResult = tenv.executeSql(ddl)
result.print()
// The following query produces correct result
// tenv.sqlQuery("select key, price, pt from sourceTable").toAppendStream[Row].print()
val sql =
"""
SELECT
TUMBLE_START(pt, INTERVAL '4' second),
sum(price)
FROM sourceTable
GROUP BY TUMBLE(pt, INTERVAL '4' second)
""".stripMargin(' ')
//this query doesn't produce any result
tenv.sqlQuery(sql).toAppendStream[Row].print()
env.execute()
Thread.sleep(20000)
}
}
stock.csv为:
key1,2020-09-16 20:50:15,1
key1,2020-09-16 20:50:12,2
key1,2020-09-16 20:50:11,3
key1,2020-09-16 20:50:18,4
key1,2020-09-16 20:50:13,5
key1,2020-09-16 20:50:20,6
key1,2020-09-16 20:50:14,7
key1,2020-09-16 20:50:22,8
key1,2020-09-16 20:50:40,9
我认为问题是您的作业运行时间不够长,无法触发窗口。每次处理时间窗口都会被触发,因为历元可以被窗口间隔整除。除非你的作业运行至少4秒,否则不能保证会出现窗口触发。