Flink Sql match_recognize返回不同的时间(不作为输入事件时间)



我有一组带有时间戳的流数据,在flink sql处理后,返回结果显示与传入(事件时间(数据时间戳不匹配的不同时间。这是我的代码数据,其中最后两行表示 flink sql 的输出,它显示了不同的时间。我们是否需要添加任何特定代码才能获得正确的时间?

3> (1001,j001,Goa Vacation,ctx[],action[],NULL,2020-06-05 06:00:20.0,Entered,Airport Recipe,2020-07-07 12:32:10.0) 3> (1001,j001,Goa Vacation,ctx[],action[],NULL,2020-06-05 06:00:20.0,Entered,Airport Recipe,2020-07-07 12:32:10.0) 3> (1001,j001,Goa Vacation,ctx[],action[],NULL,2020-06-05 06:05:10.0,Conveyer,Airport Recipe,2020-07-07 12:32:10.0) 3> (1001,j001,Goa Vacation,ctx[],action[],NULL,2020-06-05 06:05:10.0,Conveyer,Airport Recipe,2020-07-07 12:32:10.0) 3> (1001,j001,Goa Vacation,ctx[],action[],NULL,2020-06-05 06:10:13.0,Waiting,Airport Recipe,2020-07-07 12:32:10.0) 3> (1001,j001,Goa Vacation,ctx[],action[],NULL,2020-06-05 06:10:13.0,Waiting,Airport Recipe,2020-07-07 12:32:10.0) 3> (1001,j001,Goa Vacation,ctx[],action[],NULL,2020-06-05 06:30:09.0,Security Scan,Airport Recipe,2020-07-07 12:32:10.0) 3> (1001,j001,Goa Vacation,ctx[],action[],NULL,2020-06-05 06:30:09.0,Security Scan,Airport Recipe,2020-07-07 12:32:10.0) 3> (1001,j001,Goa Vacation,ctx[],action[],NULL,2020-06-05 06:50:54.0,Waiting,Airport Recipe,2020-07-07 12:32:10.0) 3> (1001,j001,Goa Vacation,ctx[],action[],NULL,2020-06-05 06:50:54.0,Waiting,Airport Recipe,2020-07-07 12:32:10.0) 3> (1001,j001,Goa Vacation,ctx[],action[],NULL,2020-06-05 07:00:34.0,Boarding,Airport Recipe,2020-07-07 12:32:10.0) 3> (1001,j001,Goa Vacation,ctx[],action[],NULL,2020-06-05 07:00:34.0,Boarding,Airport Recipe,2020-07-07 12:32:10.0) 4> (1001,2020-06-05 00:30:49.999,2020-06-05 00:35:49.999,Entered,Conveyer) 4> (1001,2020-06-05 00:31:39.999,2020-06-05 00:35:49.999,Entered,Conveyer)

Table matchResult = bsTableEnv
.sqlQuery("Select Id, StartTime, EndTime, EventName, EventB from eb_user_journey "
+ "MATCH_RECOGNIZE" + "( PARTITION BY user_id " + "ORDER BY event_time " + "MEASURES "
+ "A.user_id AS Id, " + "FIRST(A.event_time) AS StartTime, "
+ "FIRST(B.event_time) AS EndTime, A.event_name AS EventName, B.event_name AS EventB"
+ " ONE ROW PER MATCH " + " AFTER MATCH SKIP TO NEXT ROW " + "  PATTERN (A+ B) " + " DEFINE"
+ " A as A.event_name = 'Entered' , B as B.event_name = 'Conveyer')");
matchResult.printSchema();
TupleTypeInfo<Tuple5<String, Timestamp, Timestamp, String, String>> tupleType = new TupleTypeInfo<>(
Types.STRING(), Types.SQL_TIMESTAMP(), Types.SQL_TIMESTAMP(), Types.STRING(), Types.STRING());
DataStream<Tuple5<String, Timestamp, Timestamp, String, String>> cassandraDSTP = bsTableEnv
.toAppendStream(matchResult, tupleType); `

看起来有些东西正在将时间戳转换为 UTC。您应该能够在 select 语句中使用这些时态函数之一来转换回来,可能是 LOCALTIMESTAMP 或 CONVERT_TZ。

相关内容

  • 没有找到相关文章

最新更新