如何在 Flink 中的两个不同的 Kafka 流上应用相同的模式?



我在下面有这个 Flink 程序:

object WindowedWordCount {
val configFactory = ConfigFactory.load()
def main(args: Array[String]) = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val kafkaStream1 = env.addSource(new FlinkKafkaConsumer010[String](topic1, new SimpleStringSchema(), props))
.assignTimestampsAndWatermarks(new TimestampExtractor)
val kafkaStream2 = env.addSource(new FlinkKafkaConsumer010[String](topic2, new SimpleStringSchema(), props))
.assignTimestampsAndWatermarks(new TimestampExtractor)
val partitionedStream1 = kafkaStream1.keyBy(jsonString => {
extractUserId(jsonString)
})
val partitionedStream2 = kafkaStream2.keyBy(jsonString => {
extractUserId(jsonString)
})
//Is there a way to match the userId from partitionedStream1 and partitionedStream2 in this same pattern?
val patternForMatchingUserId = Pattern.begin[String]("start")
.where(stream1.getUserId() == stream2.getUserId()) //I want to do something like this
//Is there a way to pass in partitionedStream1 and partitionedStream2 to this CEP.pattern function?
val patternStream = CEP.pattern(partitionedStream1, patternForMatchingUserId)
env.execute()
}
}

在上面的 flink 程序中,我有两个名为partitionedStream1partitionedStream2的流,它们keyedBy用户 ID。

我想以某种方式比较patternForMatchingUserId模式中两个流的数据(类似于我上面显示的方式(。有没有办法将两个流传递给CEP.Pattern函数?

像这样:

val patternStream = CEP.pattern(partitionedStream1, partitionedStream2, patternForMatchingUserId)

您无法将两个流传递给CEP,但可以传递组合流。

如果两个流具有相同的类型/架构。你可以把它们联合起来。我相信这个解决方案符合你的情况。

partitionedStream1.union(partitionedStream2).keyBy(...)

如果它们具有不同的架构。您可以使用内部的一些自定义逻辑将它们转换为一个流,例如coFlatMap.

相关内容

  • 没有找到相关文章

最新更新