我使用的是flink 1.12.0。尝试将数据流转换为表a,并在表a上运行sql查询以在窗口上进行聚合,如下所示。我使用f2列作为它的时间戳数据类型字段。
EnvironmentSettings fsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, fsSettings);
Properties props = new Properties();
props.setProperty("bootstrap.servers", BOOTSTRAP_SERVER);
props.setProperty("schema.registry.url", xxx);
props.setProperty("group.id", "test");
props.setProperty("value.deserializer", KafkaAvroDeserializer.class.getName());
props.put("client.id", "flink-kafka-example");
FlinkKafkaConsumer<PlaybackListening> kafkaConsumer = new FlinkKafkaConsumer<>(
"test-topic",
ConfluentRegistryAvroDeserializationSchema.forSpecific(
Avrotest.class, prodSchemaRegistryURL),
props);
DataStreamSource<Avrotest> stream =
env.addSource(kafkaConsumer);
Table tableA = tEnv.fromDataStream(stream, $("f0"), $("f1"),$("f2"));
Table result =
tEnv.sqlQuery("SELECT f0, sum(f1),f2 FROM "
+ tableA + " GROUP BY TUMBLE(f2, INTERVAL '1' HOUR) ,f1" );
tEnv.toAppendStream(result,user.class).print();
env.execute("Flink kafka test");
}
当我执行上面的代码,我得到
线程"main"org.apache.flink.table.api.TableException:窗口聚合只能定义在一个时间属性列上,但是遇到了TIMESTAMP(6)。org.apache.flink.table.planner.plan.rules.logical.StreamLogicalWindowAggregateRule.getInAggregateGroupExpression (StreamLogicalWindowAggregateRule.scala: 50)org.apache.flink.table.planner.plan.rules.logical.LogicalWindowAggregateRuleBase.onMatch (LogicalWindowAggregateRuleBase.scala: 81)org.apache.calcite.plan.AbstractRelOptPlanner.fireRule (AbstractRelOptPlanner.java: 333)org.apache.calcite.plan.hep.HepPlanner.applyRule (HepPlanner.java: 542)org.apache.calcite.plan.hep.HepPlanner.applyRules (HepPlanner.java: 407)org.apache.calcite.plan.hep.HepPlanner.executeInstruction (HepPlanner.java: 243)org.apache.calcite.plan.hep.HepInstruction RuleInstance.execute美元(HepInstruction.java: 127)
为了使用表API在数据流上执行事件时间窗口,您需要首先分配时间戳和水印。您应该在调用fromDataStream
之前这样做。
对于Kafka,通常最好直接在FlinkKafkaConsumer
上调用assignTimestampsAndWatermarks
。更多信息请参见水印文档,kafka连接器文档和Flink SQL文档。
3步:
- 第一次分配
assignTimestampsAndWatermarks
你有几种策略。
例如:
WatermarkStrategy<Row> customTime = WatermarkStrategy
.<Row>forBoundedOutOfOrderness(Duration.ofSeconds(20))
.withTimestampAssigner((event, timestamp) -> (long) event.getField("f2"));
- 在您的源代码中分配您在步骤1中声明的内容:
env.addSource().assignTimestampsAndWatermarks(customTime)
- 声明表,并为时间戳字段设置行时间:
Table tableA = tEnv.fromDataStream(stream, $("f0"), $("f1"),$("f2").rowtime());