我的需求是围绕flink中的sql查询结果处理或构建一些逻辑。为了简单起见,假设我有两个sql查询,它们在不同的窗口大小和一个事件流上运行。我的问题是
- a(我将如何知道这是哪个查询结果
- b(如何知道执行查询的结果是多少行?我需要这些信息,因为我必须构建一个带有事件列表的通知消息,这些事件是查询结果的一部分
DataStream<Event> ds = ...
String query = "select id, key" +
" from eventTable GROUP BY TUMBLE(rowTime, INTERVAL '10' SECOND), id, key ";
String query1 = "select id, key" +
" from eventTable GROUP BY TUMBLE(rowTime, INTERVAL '1' DAY), id, key ";
List<String> list = new ArrayList<>();
list.add(query);
list.add(query1);
tabEnv.createTemporaryView("eventTable", ds, $("id"), $("timeLong"), $("key"),$("rowTime").rowtime());
for(int i =0; i< list.size(); i++ ){
Table result = tabEnv.sqlQuery(list.get(i));
DataStream<Tuple2<Boolean, Row>> dsRow = tabEnv.toRetractStream(result, Row.class);
dsRow.process(new ProcessFunction<Tuple2<Boolean, Row>, Object>() {
List<Row> listRow = new ArrayList<>();
@Override
public void processElement(Tuple2<Boolean, Row> booleanRowTuple2, Context context, Collector<Object> collector) throws Exception {
listRow.add(booleanRowTuple2.f1);
}
});
}
感谢你的帮助。感谢Ashutosh
要区分哪些结果来自哪个查询,可以在查询本身中为每个查询包含一个标识符,例如
SELECT '10sec', id, key FROM eventTable GROUP BY TUMBLE(rowTime, INTERVAL '10' SECOND), id, key
确定结果表中的行数比较棘手。一个问题是,流式查询的结果数量没有最终答案。但是,在处理结果的地方,似乎可以计算行数。
或者,我还没有尝试过,但也许您可以使用类似row_number() over(order by tumble_rowtime(rowTime, interval '10' second))
的东西来用计数器注释结果的每一行。