处于流模式的表API未将结果打印到控制台



你能告诉我为什么流表的分组结果没有打印到控制台的结果吗。

未分组的结果正在打印到控制台,没有任何问题。

Flink版本=1.15.0(SCALA=2.12((JAVA=8(

提前谢谢。

import org.apache.flink.table.api.{DataTypes, EnvironmentSettings, Schema, Table, TableDescriptor, TableEnvironment}
object readKafkaStream02 extends App {
val settings = EnvironmentSettings
.newInstance()
.inStreamingMode()
// .inBatchMode()
.build()
val tEnv = TableEnvironment.create(settings);
val schema = Schema.newBuilder()
schema.column("id", DataTypes.INT())
schema.column("type", DataTypes.STRING())
schema.column("amount", DataTypes.FLOAT())
schema.column("trx_timestamp", DataTypes.TIMESTAMP(3))
schema.watermark("trx_timestamp", "trx_timestamp - INTERVAL '10' SECOND")
tEnv.createTemporaryTable("kafka_stream_input",  TableDescriptor.forConnector("kafka")
.schema(schema.build())
.format("csv")
.option("topic","test_producer01")
.option("properties.bootstrap.servers","localhost:9092")
.option("properties.group.id","flink-test")
.option("scan.startup.mode" , "latest-offset")
.build()
)
/* tEnv.executeSql(
"""select * FROM TABLE
|(
|TUMBLE( DATA => TABLE kafka_stream_input,TIMECOL => DESCRIPTOR(trx_timestamp),SIZE => INTERVAL '2' MINUTES)
|)
|""".stripMargin ).print()*/  ========> works fine 

val temp_table : Table  = tEnv.sqlQuery(
"""select  sum(amount)  , window_start , window_end from TABLE
|(
|TUMBLE( DATA => TABLE kafka_stream_input,TIMECOL => DESCRIPTOR(trx_timestamp),SIZE => INTERVAL '2' MINUTES)
|) GROUP BY window_start, window_end ;
|""".stripMargin )
tEnv.registerTable("temp_table",temp_table)
temp_table.execute().print() `enter code here`======> just hung and not printing any o/p to console 

/* tEnv.executeSql(
"""select TUMBLE_START(trx_timestamp, INTERVAL '10' MINUTE ) , sum(amount)  from kafka_stream_input
|group by TUMBLE(trx_timestamp, INTERVAL '10' MINUTE )
|""".stripMargin).print()*/
}

如下:Flink SQL查询不返回结果

按预期工作。

val tEnv_config=tEnv.getConfig((;

tEnv_config.set("table.exec.source.iidle timeout","1000ms"(;

相关内容

  • 没有找到相关文章

最新更新