我正在使用flink 1.11,并尝试在match_recognize所在的位置进行嵌套查询,如下所示:
select * from events where id = (SELECT * FROM events MATCH_RECOGNIZE (PARTITION BY org_id ORDER BY proctime MEASURES A.id AS startId ONE ROW PER MATCH PATTERN (A C* B) DEFINE A AS A.tag = 'tag1', C AS C.tag <> 'tag2', B AS B.tag = 'tag2'));
我得到一个错误:org.apache.calcite.sql.validate.SqlValidatorException: Table 'A' not found
是否不支持此操作?如果不是,还有什么选择?
我通过这样做得到了一些工作:
Table events = tableEnv.fromDataStream(input,
$("sensorId"),
$("ts").rowtime(),
$("kwh"));
tableEnv.createTemporaryView("events", events);
Table matches = tableEnv.sqlQuery(
"SELECT id " +
"FROM events " +
"MATCH_RECOGNIZE ( " +
"PARTITION BY sensorId " +
"ORDER BY ts " +
"MEASURES " +
"this_step.sensorId AS id " +
"AFTER MATCH SKIP TO NEXT ROW " +
"PATTERN (this_step next_step) " +
"DEFINE " +
"this_step AS TRUE, " +
"next_step AS TRUE " +
")"
);
tableEnv.createTemporaryView("mmm", matches);
Table results = tableEnv.sqlQuery(
"SELECT * FROM events WHERE events.sensorId IN (select * from mmm)");
tableEnv
.toAppendStream(results, Row.class)
.print();
出于某种原因,如果不定义一个视图,我就无法让它发挥作用。我不断收到Calcite的错误。
我想您是在尽量避免枚举MATCH_RECOGNIZE的MEASURES子句中A的所有列。您可能想要比较结果执行计划,看看是否有任何显著差异。