在 Apache Flink 中手动更新状态的最佳方法是什么?



我在一个股票市场项目中使用Apache Flink来计算当前的价格变化。公式为

price_change = (current_price - previous_close_price) / previous_close_price

previous_close_price是证券在前一交易日的收盘价。每天开市前,我都需要更新previous_close_price

现在我想出了几个解决方案,但我不知道哪一个是最好的。

  1. previous_close_price存储在redis中,并在每次计算中获取价格。更新价格既简单又灵活,但这种解决方案可能会影响性能。

  2. 将状态的TTL设置为1天。在旧状态过期时获取新状态。但它并不灵活,因为TTL是硬编码的。

  3. 广播状态模式。我不确定这个解决方案是否有效。

  4. 给flink发个特别的信息。当flink收到消息时,它会更新previous_close_price

欢迎提出任何建议。

我建议在#4:上使用一个变体

有两个来源,一个只用于收盘价,另一个用于交易流。通过安全性对两个流进行加密,并使用CoProcessFunction将它们连接起来。将previous_close_price以键控状态存储在CoProcessFunction中。

每天,在开盘前,都会输入最新的收盘价。

这可以使用RichCoFlatMap来完成,但我建议使用CoProcessFunction,因为您可能希望使用侧输出来报告错误(例如,缺少previous_close_price的证券(。

至于其他方法:

  1. 我认为将previous_close_price数据保存在外部数据存储中没有任何好处
  2. 我觉得这个效果不太好。没有可用于触发加载新数据的挂钩,而且,只有在访问状态时才会清除该状态
  3. 对于广播状态来说,这不是一个好的用例,除非集群中的每个人都需要知道所有证券的收盘价

相关内容

  • 没有找到相关文章

最新更新