我有一个数据库,包含表T1(id,name,age(和T2(id,subject(。Flink使用debezium之类的东西作为事件流从数据库接收所有更新。这些表相互关联,可以通过将T1与T2连接到id来提取所需数据。目前,数据库的整个状态存储在Flink MapState中,以id为密钥。现在的问题是,我需要根据T1中的name选择行,而不使用id。似乎我需要T1(名称(上的索引,以使其更快。有没有什么方法可以自动为它建立索引,而不必为每个表手动创建索引。推荐的方法是什么?。我知道表上的SQL流,但我需要对表更新的支持。顺便说一下,我在Scala中使用Flink。如有任何建议,我们将不胜感激。
我的理解是,您正在连接T1和T2,并以键控状态存储这两个流中数据的一些表示形式(在MapState中(。听起来T1和T2是随着时间的推移而演变的,您希望能够通过指定名称随时交互查询联接。
一个想法是以您想要选择的名称进行广播,并使用KeyedBroadcastProcessFunction来处理它们。在其processBroadcastElement方法中,您可以使用ctx.applyToKeyedState通过从MapState记录中提取数据来计算结果(这些数据必须保存在该运算符中(。我怀疑您会想使用这些名称作为这些MapState记录中的键,这样您就不必迭代每个映射中的所有条目来查找感兴趣的项目。
您将在中找到一个与此模式有些相似的示例https://training.data-artisans.com/exercises/ongoingRides.html.