如何在 Flink 中使用模式匹配 where 子句?



我在下面有这个 Flink 程序:

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val dataStream = env.addSource(new FlinkKafkaConsumer010[String](topicChannel1, new SimpleStringSchema(), props))
val partitionedInput = dataStream.keyBy(jsonString => {
val jsonParser = new JsonParser()
val jsonObject = jsonParser.parse(jsonString).getAsJsonObject()
jsonObject.get("account")
})
val pattern = Pattern.begin[String]("start").where(jsonString => 
val jsonParser = new JsonParser()
val jsonObject = jsonParser.parse(jsonString).getAsJsonObject()
jsonObject.get("account") == "iOS") //ERROR HERE
val patternStream = CEP.pattern(partitionedInput, pattern)

我在val pattern = ...行收到错误,说Expected IterativeCondition[String], actual: (Nothing) => Unit.

我的dataStream由 JSON 对象组成,我通过 JSON 对象中的帐户密钥在keyBy密钥中解析这些对象。然后我正在尝试创建一个模式,但在创建模式时出现错误。

确保使用正确的 API。对于 scala,您应该导入

import org.apache.flink.cep.scala.pattern.Pattern

而不是

import org.apache.flink.cep.pattern.Pattern

相关内容

  • 没有找到相关文章

最新更新