我在一个股票市场项目中使用Apache Flink来计算当前的价格变化。公式为
price_change = (current_price - previous_close_price) / previous_close_price
previous_close_price
是证券在前一交易日的收盘价。每天开市前,我都需要更新previous_close_price
。
现在我想出了几个解决方案,但我不知道哪一个是最好的。
将
previous_close_price
存储在redis中,并在每次计算中获取价格。更新价格既简单又灵活,但这种解决方案可能会影响性能。将状态的TTL设置为1天。在旧状态过期时获取新状态。但它并不灵活,因为TTL是硬编码的。
广播状态模式。我不确定这个解决方案是否有效。
给flink发个特别的信息。当flink收到消息时,它会更新
previous_close_price
。
欢迎提出任何建议。
我建议在#4:上使用一个变体
有两个来源,一个只用于收盘价,另一个用于交易流。通过安全性对两个流进行加密,并使用CoProcessFunction将它们连接起来。将previous_close_price以键控状态存储在CoProcessFunction中。
每天,在开盘前,都会输入最新的收盘价。
这可以使用RichCoFlatMap来完成,但我建议使用CoProcessFunction,因为您可能希望使用侧输出来报告错误(例如,缺少previous_close_price的证券(。
至于其他方法:
- 我认为将previous_close_price数据保存在外部数据存储中没有任何好处
- 我觉得这个效果不太好。没有可用于触发加载新数据的挂钩,而且,只有在访问状态时才会清除该状态
- 对于广播状态来说,这不是一个好的用例,除非集群中的每个人都需要知道所有证券的收盘价