你能告诉我为什么流表的分组结果没有打印到控制台的结果吗。
未分组的结果正在打印到控制台,没有任何问题。
Flink版本=1.15.0(SCALA=2.12((JAVA=8(
提前谢谢。
import org.apache.flink.table.api.{DataTypes, EnvironmentSettings, Schema, Table, TableDescriptor, TableEnvironment}
object readKafkaStream02 extends App {
val settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
// .inBatchMode()
.build()
val tEnv = TableEnvironment.create(settings);
val schema = Schema.newBuilder()
schema.column("id", DataTypes.INT())
schema.column("type", DataTypes.STRING())
schema.column("amount", DataTypes.FLOAT())
schema.column("trx_timestamp", DataTypes.TIMESTAMP(3))
schema.watermark("trx_timestamp", "trx_timestamp - INTERVAL '10' SECOND")
tEnv.createTemporaryTable("kafka_stream_input", TableDescriptor.forConnector("kafka")
.schema(schema.build())
.format("csv")
.option("topic","test_producer01")
.option("properties.bootstrap.servers","localhost:9092")
.option("properties.group.id","flink-test")
.option("scan.startup.mode" , "latest-offset")
.build()
)
/* tEnv.executeSql(
"""select * FROM TABLE
|(
|TUMBLE( DATA => TABLE kafka_stream_input,TIMECOL => DESCRIPTOR(trx_timestamp),SIZE => INTERVAL '2' MINUTES)
|)
|""".stripMargin ).print()*/ ========> works fine
val temp_table : Table = tEnv.sqlQuery(
"""select sum(amount) , window_start , window_end from TABLE
|(
|TUMBLE( DATA => TABLE kafka_stream_input,TIMECOL => DESCRIPTOR(trx_timestamp),SIZE => INTERVAL '2' MINUTES)
|) GROUP BY window_start, window_end ;
|""".stripMargin )
tEnv.registerTable("temp_table",temp_table)
temp_table.execute().print() `enter code here`======> just hung and not printing any o/p to console
/* tEnv.executeSql(
"""select TUMBLE_START(trx_timestamp, INTERVAL '10' MINUTE ) , sum(amount) from kafka_stream_input
|group by TUMBLE(trx_timestamp, INTERVAL '10' MINUTE )
|""".stripMargin).print()*/
}
如下:Flink SQL查询不返回结果
按预期工作。
val tEnv_config=tEnv.getConfig((;
tEnv_config.set("table.exec.source.iidle timeout","1000ms"(;