.
你好
使用 Apache Flink 1.8。我有一股来自 Kafka 的记录流作为 JSON 并过滤它们,一切正常。
现在,我想用数据库表中的查找值来丰富 Kafka 中的数据。
这只是创建 2 个流,在第二个流中加载表然后联接数据的情况吗?
数据库表确实会更新,但不经常更新,我想避免在通过流的每条记录上查找数据库。
Flink 有状态,你可以在这里利用它。我做过类似的事情,我从我的查找表中获取每日查询(在我的例子中是批量 Web 服务调用),并将结果通过结果转换为 kafka 主题。此 kafka 主题由需要查找数据的同一服务 flink 作业使用。这两个主题都由相同的值键控,但我使用查找主题将数据存储到键控状态,并且在处理另一个主题时,我会将数据拉回状态。
我有一些额外的逻辑来检查给定密钥是否还没有状态。如果是这种情况,我会向 Web 服务发出异步请求。但是,您可能不需要这样做。
这里需要注意的是,我有用于状态管理的内存,而我的查找表只有大约 3000 万条记录,大约 100 个 GB 分布在 15 个节点上的 45 个插槽中。
[回答评论中的问题] 抱歉,我的回答太长了,所以不得不编辑我的帖子:
我有一个通过批量 REST 调用加载数据的 python 作业(您的只能进行数据查找)。然后,它将数据转换为正确的格式并将其转储到 Kafka 中。然后我的 flink 流有两个来源,一个是"真实数据"主题,另一个是"查找数据"主题。 来自查找数据主题的数据存储在状态中(我使用了 ValueState,因为每个键都映射到单个可能的值,但还有其他状态类型。 每个条目也有 24 小时的到期时间,但这是我的用例。
诀窍在于,将值存储在查找主题的状态中的相同操作必须是将值从"真实"主题中拉出状态的操作。这是因为 flink 状态(甚至是键控状态)与创建它们的运算符相关联。