如何解决 flink 服务器中的错误"Rowtime timestamp is null"



我试图在flink服务器中运行一些代码,但无法

下面是我的代码

DataStream<UserInfo> keyedStream = executionEnvironment
.addSource(new UserDataSource());
keyedStream.assignTimestampsAndWatermarks(new MessageWaterEmitter());
tableEnv.registerDataStream("test", keyedStream, "userId,ticks,startime.rowtime");
Table table = tableEnv
.sqlQuery(
"SELECT userId,COUNT(userId) as ticks,TUMBLE_END(startime,INTERVAL '5' SECOND) as startime FROM test "
+ "GROUP BY TUMBLE(startime,INTERVAL '5' SECOND),userId");
DataStream<Row> userInfoDataStream = tableEnv.toRetractStream(table, Row.class)
.filter(new FilterFunction<Tuple2<Boolean, Row>>() {
@Override
public boolean filter(Tuple2<Boolean, Row> booleanUserInfoTuple2) throws Exception {
return booleanUserInfoTuple2.f0;
}
}).map(new MapFunction<Tuple2<Boolean, Row>, Row>() {
@Override
public Row map(Tuple2<Boolean, Row> booleanUserInfoTuple2) throws Exception {
return booleanUserInfoTuple2.f1;
}
});
JdbcSink sink = new JdbcSink();
userInfoDataStream.addSink(sink);

下面是我获取的错误

java.lang.RuntimeException: Rowtime timestamp is null. Please make sure that a proper TimestampAssigner is defined and the stream environment uses the EventTime time characteristic.
at DataStreamSourceConversion$651.processElement(Unknown Source)
at org.apache.flink.table.runtime.CRowOutputProcessRunner.processElement(CRowOutputProcessRunner.scala:70)
at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:637)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:612)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:592)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:707)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:660)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:727)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:705)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollect(StreamSourceContexts.java:305)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collect(StreamSourceContexts.java:394)
at flink.source.UserDataSource.run(UserDataSource.java:20)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:208)

有人能帮我解决这个问题吗?提前感谢

此问题的根本原因是,当您在keyedStream上调用assignTimestampsAndWatermarks时,您没有对该调用的结果执行任何操作。如果你像这样修改代码,它就会起作用:

DataStream<UserInfo> keyedStream = executionEnvironment
.addSource(new UserDataSource())
.assignTimestampsAndWatermarks(new MessageWaterEmitter());
tableEnv.registerDataStream("test", keyedStream, "userId,ticks,startime.rowtime");

对流调用assignTimestampsAndWatermarks不会修改该流,而是返回一个具有时间戳和水印的新流。

这也可以像这样修复,这可能会更清楚地表明发生了什么:

DataStream<UserInfo> streamWithTSandWMs = keyedStream
.assignTimestampsAndWatermarks(new MessageWaterEmitter());
tableEnv.registerDataStream("test", streamWithTSandWMs, "userId,ticks,startime.rowtime");

最新更新