我正在相同的 flink 作业中阅读 2 个 kafka 主题。
Stream1
:来自第一个主题的消息被保存到rocksdb,然后它将与stream2合并。Stream2
:来自第二个主题的消息使用流 1 保存的状态进行丰富,然后它将与 stream1 并集。
主题1 和主题 2 是不同的源,但两个源的输出基本相同。我必须用来自主题 1 的数据来丰富来自主题 2 的数据。
这是流量;
val stream1 = readKafkaTopic1().keyBy(_.memberId).map(saveMemberDetailsToRocksDB)
val stream2 = readKafkaTopic2().keyBy(_.memberId).map(readMemberDetailsAndEnrich)
stream1.union(stream2).addSink(kafkaProducer)
这是问题;
- 这种流程好吗?
stream2
可以访问stream1
为同一memberId
保存的状态吗?
似乎您应该能够通过使用KeyedCoProcessFunction
来实现您想要的。这或多或少会想要:
stream1
.keyBy(_.memberId)
.connect(stream2.keyBy(_.memberId))
.process(new CustomKeyedCoProcessFunction())
这样,您可以将状态保留在单个KeyedCoProcessFunction
中,因此您可以访问stream1
和stream2
。
所以,对于processElement1
你可以做同样的事情,你在map
里面做同样的事情stream1
和processElement2
你可以做你在map
流 2 里做同样的事情。