下面是我使用flink sql API连接两个表的代码
tEnv.createTemporaryView("A", streamA,"speed_sum,cnt,window_start_time,window_end_time");
tEnv.createTemporaryView("B",streamB,"speed_sum,cnt,window_start_time,window_end_time");
String execSQL1 = "select A.speed_sum+COALESCE(B.speed_sum,0.0), " +
"A.cnt+COALESCE(B.cnt,0), " +
"A.window_start_time, A.window_end_time " +
"from A " +
"left join B on A.window_start_time = B.window_start_time ";
Table table = tEnv.sqlQuery(execSQL1);
DataStream<Tuple2<Boolean, Row>> streamResult = tEnv.toRetractStream(table, Row.class).;
streamResult.print("streamResult");
的输出是这样的:
streamA-----------(5078.000000,199,1635333650000,1635333660000)
streamB-----------(1721.388891,111,1635333650000,1635333660000)
streamResult:3> (true,5078.0,199,1635333650000,1635333660000) // drop
streamResult:3> (false,5078.0,199,1635333650000,1635333660000) // drop
streamResult:3> (true,6799.388891220093,310,1635333650000,1635333660000) // want to save
可以看到,toRetractStream
API将生成三条记录。我想知道如何得到最后一条记录,它正确地将A.speed_sum
和B.speed_sum
(A.cnt
和B.cnt
)相加。
一些流SQL查询,比如JOIN,会产生一个更新流。考虑到流媒体的连续性和无界性,Flink没有办法知道什么时候"结束"。结果已达到。
如果你在有限的输入上执行这个查询,你可以在批处理模式下执行,然后只打印最终结果。
在一些流用例中,您可以使用时间属性而不是时间戳,然后Flink SQL计划器能够推断出某些查询的结果何时完成。例如,这就是Flink SQL中的窗口能够产生追加流而不是更新流的方式。您的查询几乎是一个间隔连接。如果它是一个间隔连接,那么结果流将是一个追加流,并且您不必处理这些回缩。