我是 flink 的新手,想了解如何使用 FLINK 运行我的用例: 应用程序具有三个输入数据源 a( 历史数据 b( 从 Kafka 获取所有现场事件 c( 获取将具有触发条件的控制事件
由于应用程序正在处理历史数据,因此我认为我将合并历史数据和实时数据,并将在该流上创建一个表。
要触发事件,我们必须在控制事件的帮助下编写 SQL 查询,该事件是输入源并保存 where 子句。
我的问题是构建SQL查询,因为数据在Stream中,当我做类似的事情时
DataStream<ControlEvent> controlEvent
controlEvent.map(new FlatMapFunction(String, String)
{
@override
public String flatMap(String s, Collector<String> coll)
{
tableEnv.execute("select * from tableName"); /// throw serialization exception
}
});
它抛出不序列化异常本地执行环境
Flink SQL 还不支持这种动态查询注入。
更新:
鉴于您所说的需求 - 查询中的变体将受到限制 - 您可以做的是使用DataStream API而不是SQL来实现这一点。这可能是一个保存一些键控状态的KeyedBroadcastProcessFunction
,您可以广播查询/查询的更新。
看看 欺诈检测演示 作为如何使用 Flink 构建这种东西的示例。