我使用的是Flink 1.12。我想读取csv,并根据处理时间对窗口进行滚动分组。
代码如下,但没有输出查询sql_tubmle_window
,我想知道的问题在哪里
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api.bridge.scala._
import org.apache.flink.types.Row
object Sql017_ProcessTimeAttributeDDLTest {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val tenv = StreamTableEnvironment.create(env)
val ddl =
"""
create table sourceTable(
key STRING,
price DOUBLE,
pt as PROCTIME() ---processing time
) with (
'connector' = 'filesystem',
'path' = 'D:/stock_id_price.csv',
'format' = 'csv'
)
""".stripMargin(' ')
//Create the source table
tenv.executeSql(ddl)
//NOTE: The following query produces correct result
tenv.sqlQuery("select key, price, pt from sourceTable").toAppendStream[Row].print()
//there is no output for the tumble group by query
val sql_tumble_window =
"""
SELECT
TUMBLE_START(pt, INTERVAL '4' second),
TUMBLE_END(pt, INTERVAL '4' second),
sum(price),
'FLAG'
FROM sourceTable
GROUP BY TUMBLE(pt, INTERVAL '4' second)
""".stripMargin(' ')
println("=" * 20)
println("=" * 20)
//There is no output for this sql query
tenv.sqlQuery(sql_tumble_window).toAppendStream[Row].print()
env.execute()
Thread.sleep(20 * 1000)
}
}
问题是作业在窗口有机会启动之前就已经运行完成。
当使用有界输入(如文件(运行Flink流作业时,该作业在完全消耗和处理输入后结束。同时,只要一天中的时间恰好是纪元后4秒的倍数,就会触发4秒长的处理时间窗口——除非CSV文件很长,否则不太可能发生这种情况。
你可能会期望20秒的长睡眠来解决这个问题。但是,在Flink客户端向集群提交作业后,它就会发生睡眠。这不会影响流作业本身的执行。