ApacheFlink:具有大型状态的管道的最佳实践



背景

我想把一个在DB上运行的作业转移到Flink上,让系统更有效地工作;实时";。此作业将重新计算";状态";所有帐户的每10秒,如果该值满足某些条件,我们将通知用户。";状态";账户的价值是根据他们持有的股票数量和这些股票的当前价格来计算的。

我的解决方案

我的想法是用这些输入创建一个管道:

  • BeginStream:当日开始数据(初始数据(:最后一天的股价,每个账户的股票数量。数据可以从DB表中读取,也可以从Kafka主题中加载
  • PriceStream:一个Kafka主题,每条消息都包含一只股票的价格,如下所示:{"名称":"存货名称","价格":currentPrice}
  • StockStream:每个消息的Kafka主题都包含账户买入或卖出的股票数量(正数或负数(,如下所示:{帐号:帐号,存货:存货名称,数量:变更号}

我的解决方案:加入BeginStream和StockStream,然后加入PriceStream(所有都将是keyBy StockName(,为了计算BeginStream的每只股票,我将创建一个名为stockStates的ListState,其中包含关于股票数量和当前价格的信息。

对于StockStream和PriceStream的每个事件,我将更新stockStates,然后计算";状态";。如果这个值满足某些条件,我们将向其他Kafka主题发送消息,并从这个ListState 中删除这个帐户

BeginStream
.keyby(StockName)
.connect(
StockStream.
.keyby(StockName))
.flatMap(new EnrichmentFucntion())
...
.connect(
PriceStream.
.keyby(StockName))
.flatMap(new EnrichmentFucntion())

该系统包含约500000个账户,1.000只股票,每个账户持有10-20只股票,PriceStream和StockStream的吞吐量约为1.000条消息/秒。

问题

我是Flink的新手,因此我不太确定我的解决方案是否是一个好方法?类似的问题有什么设计模式吗?对于大约1000个ListState的数量(每个列表包含大约500.000*10/1000=5.000帐户的状态(,我应该使用RocksDB进行状态存储吗?

如有任何建议,我们将不胜感激。

是的,您的用例可以在flink中实现。RocksDB作为状态后端是一个不错的选择。

相关内容

  • 没有找到相关文章

最新更新