Apache flink 1.52 Rowtime时间戳为null



我正在用以下代码进行一些查询:

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
DataStream<Row> ds = SourceHelp.builder().env(env).consumer010(MyKafka.builder().build().kafkaWithWaterMark2())
.rowTypeInfo(MyRowType.builder().build().typeInfo())
.build().source4();
//,proctime.proctime,rowtime.rowtime
String sql1 = "select a,b,max(rowtime)as rowtime from user_device group by a,b";
DataStream<Row> ds2 = TableHelp.builder().tableEnv(tableEnv).tableName("user_device").fields("a,b,rowtime.rowtime")
.rowTypeInfo(MyRowType.builder().build().typeInfo13())
.sql(sql1).in(ds).build().result();
ds2.print();
// String sql2 = "select a,count(b) as b from user_device2 group by a";
String sql2 = "select a,count(b) as b,HOP_END(rowtime,INTERVAL '5' SECOND,INTERVAL '30' SECOND) as c from user_device2 group by HOP(rowtime, INTERVAL '5' SECOND, INTERVAL '30' SECOND),a";
DataStream<Row> ds3 = TableHelp.builder().tableEnv(tableEnv).tableName("user_device2").fields("a,b,rowtime.rowtime")
.rowTypeInfo(MyRowType.builder().build().typeInfo14())
.sql(sql2).in(ds2).build().result();
ds3.print();
env.execute("test");

注意:对于sql1,我使用了rowtime的max函数,它不起作用,并且抛出了以下异常:

线程"main"中出现异常org.apache.flink.runtime.client.JobExecutionException:java.lang.RuntimeException:行时间时间戳为null。请制作确保定义了正确的TimestampAssigner并且流环境使用EventTime时间特性。在org.apache.flink.runtime.minicluster.minicluster.executeJobBlocking(minicluster.java:625)在org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironments.java:123)在com.aicaigroup.water.WaterTest.testRowtimeWithMoreSqls5(WaterTest.java:158)网址:com.aicaigroup.water.WaterTest.main(WaterTest.java:20)原因:java.lang.RuntimeException:行时间时间戳为null。请制作确保定义了正确的TimestampAssigner并且流环境使用EventTime时间特性。在DataStreamSourceConversion$24.processElement(未知源)位于org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRuner.scala:67)在org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)在org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:558)在org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:533)在org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:513)在org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:628)在org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:581)在org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collection(AbstractStreamOperator.java:679)在org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collection(AbstractStreamOperator.java:657)在org.apache.flink.streaming.api.operators.TimestampdCollector.collect(TimestampyCollector.java:51)网址:com.aicaigroup.TableHelp$1.processElement(TableHelp.java:42)com.aicaigroup.TableHelp$1.processElement(TableHelp.java:39)org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)在org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:558)在org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:533)在org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:513)在org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collection(AbstractStreamOperator.java:679)在org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collection(AbstractStreamOperator.java:657)在org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)在org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:558)在org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:533)在org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:513)在org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collection(AbstractStreamOperator.java:679)在org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collection(AbstractStreamOperator.java:657)在org.apache.flink.streaming.api.operators.TimestampdCollector.collect(TimestampyCollector.java:51)在org.apache.flink.table.runtime.gaggregate.GroupAggProcessFunction.processElement(GroupAggProcessFunction.scala:151)在org.apache.flink.table.runtime.gaggregate.GroupAggProcessFunction.processElement(GroupAggProcessFunction.scala:39)在org.apache.flink.streaming.api.operators.LegacyKeyedProcessOperator.processElement(LegacyKeyed ProcessOperator.java:88)在org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:202)在org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:104)在org.apache.flink.streaming.runtime.tasks.StreamTask.ioke(StreamTask.java:306)在org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)java.lang.Thread.run(线程.java:748)2018-09-17 09:51:53.679[卡夫卡0.10源获取器:自定义源->映射->来自:(a,b,rowtime)->选择:(a、b,CAST(rowtime)AS rowtime)(2/8)]信息o.a.kafka.clients.consumer.internals.AbstractCoordinator-已发现组的协调器172.16.11.91:9092(id:2147483647 rack:null)测验

然后我尝试像这样更新sql1"从user_device中选择a,b,rowtime",它就工作了。那么如何修复错误呢?第一个sql应该使用group-by,第二个sql应该用rowtime-by时间窗口。3QS

我从1.6开始退缩,遇到了和你类似的问题。通过这些步骤解决:

  • 使用assignTimestampsAndWatermarks,只需使用默认和正常的实现BoundedOutOfOrdernessTimestampExtractor。您需要编写extractTimestamp函数来提取时间戳值并在构造函数中声明窗口间隔
  • 字段末尾的append,proctime.proctime,rowtime.rowtime(我使用fromDataStream(Flink 1.6)将流转换为表)
  • 如果要使用exist字段作为rowtime。例如,数据源字段是"a,clicktime,c",您可以声明"a,单击时间.行时间,c">

希望它能帮助你。

相关内容

  • 没有找到相关文章

最新更新