Apache Flink RocksDB 状态管理



我正在相同的 flink 作业中阅读 2 个 kafka 主题。

  • Stream1:来自第一个主题的消息被保存到rocksdb,然后它将与stream2合并。
  • Stream2:来自第二个主题的消息使用流 1 保存的状态进行丰富,然后它将与 stream1 并集。

主题1 和主题 2 是不同的源,但两个源的输出基本相同。我必须用来自主题 1 的数据来丰富来自主题 2 的数据。

这是流量;

val stream1 = readKafkaTopic1().keyBy(_.memberId).map(saveMemberDetailsToRocksDB)
val stream2 = readKafkaTopic2().keyBy(_.memberId).map(readMemberDetailsAndEnrich)
stream1.union(stream2).addSink(kafkaProducer)

这是问题;

  1. 这种流程好吗?
  2. stream2可以访问stream1为同一memberId保存的状态吗?

似乎您应该能够通过使用KeyedCoProcessFunction来实现您想要的。这或多或少会想要:

stream1
.keyBy(_.memberId)
.connect(stream2.keyBy(_.memberId))
.process(new CustomKeyedCoProcessFunction())

这样,您可以将状态保留在单个KeyedCoProcessFunction中,因此您可以访问stream1stream2

所以,对于processElement1你可以做同样的事情,你在map里面做同样的事情stream1processElement2你可以做你在map流 2 里做同样的事情。

相关内容

  • 没有找到相关文章

最新更新