我在下面有这个 Flink 程序:
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val dataStream = env.addSource(new FlinkKafkaConsumer010[String](topicChannel1, new SimpleStringSchema(), props))
val partitionedInput = dataStream.keyBy(jsonString => {
val jsonParser = new JsonParser()
val jsonObject = jsonParser.parse(jsonString).getAsJsonObject()
jsonObject.get("account")
})
val pattern = Pattern.begin[String]("start").where(jsonString =>
val jsonParser = new JsonParser()
val jsonObject = jsonParser.parse(jsonString).getAsJsonObject()
jsonObject.get("account") == "iOS") //ERROR HERE
val patternStream = CEP.pattern(partitionedInput, pattern)
我在val pattern = ...
行收到错误,说Expected IterativeCondition[String], actual: (Nothing) => Unit
.
我的dataStream
由 JSON 对象组成,我通过 JSON 对象中的帐户密钥在keyBy
密钥中解析这些对象。然后我正在尝试创建一个模式,但在创建模式时出现错误。
确保使用正确的 API。对于 scala,您应该导入
import org.apache.flink.cep.scala.pattern.Pattern
而不是
import org.apache.flink.cep.pattern.Pattern