我有一个SQL查询,我想解析计算。我已经使用JSQL解析器解析了SQL。现在,我需要计算SQL中的where
子句。我想做它在Flink作为过滤器功能的一部分。基本上是stream.filter(Predicate<Row>)
。Predicate<Row>
是我需要从SQL的where子句的评估中得到的,并将其应用于流记录。
Ex:SELECT COLUMN FROM TABLE WHERE (ac IS NOT NULL AND ac = 'On')
我想解析上面的查询,并给出一个流记录,说ac =on
,我想在该记录上运行上述表达式计算。
我该怎么做呢?
我想尝试使用表达式评估与DFS,但有点困惑如何运行它。任何帮助都是感激的!
如果SQL查询在编译时是已知的,那么将Flink SQL(通过Table API)集成到DataStream应用程序中是更直接的方法。查看文档获取更多信息和示例。
总体方法是将数据流转换为动态表(如果流是方便类型,例如POJO,则可以自动完成),对其应用SQL查询,然后(如果需要)将结果表转换回数据流。
或者只是实现表的整个应用程序API如果你不需要任何的功能网络中是独一无二的。
另一方面,如果查询是动态的,并且直到运行时才提供,则需要执行您所建议的内容。据我所知,其他有类似需求的人已经在基于jvm的运行时中使用了动态语言,比如通过Rhino的Javascript或Groovy。总体方法是使用BroadcastProcessFunction,将动态代码广播到操作符中。