在 flink 上更新流图内的并发映射



我有一个流,它不断流式传输某些键的最新值。

流 A:DataStream[(String,Double)]

我还有另一个流想要在每个进程调用上获取最新值。

我的方法是引入一个concurrentHashMap,该将由流 A 更新并由第二个流读取。

val rates = new concurrentHasMap[String,Double].asScala
val streamA : DataStream[(String,Double)]= ???
streamA.map(keyWithValue => rates(keyWithValue._1)= keyWithValue._2) //rates never gets updated
rates("testKey")=2 //this works
val streamB: DataStream[String] = ???
streamB.map(str=> rates(str)  // rates does not contain the values of the streamA at this point
//some other functionality
) 

是否可以从流中更新并发映射?与另一个流共享流中数据的任何其他解决方案也是可以接受

您尝试使用的行为不会以分布式方式工作,基本上如果您拥有parellelism> 1,它将不起作用。在您的代码中,rates实际上是更新的,但在不同的并行运算符实例中。

实际上,在这种情况下,您要做的是使用旨在解决您面临的问题的BroadcastState

在您的特定用例中,它看起来像这样:

val streamA : DataStream[(String,Double)]= ???
val streamABroadcasted = streamA.broadcast(<Your Map State Definition>)
val streamB: DataStream[String] = ???
streamB.connect(streamABroadcasted)

然后,您可以轻松地使用BroadcastProcessFunction来实现您的逻辑。有关广播状态模式的更多信息,请参阅此处

相关内容

  • 没有找到相关文章

最新更新