没有从 Flink 模式流中打印出任何内容



我在下面有这段代码:

import java.util.Properties
import com.google.gson._
import com.typesafe.config.ConfigFactory
import org.apache.flink.cep.scala.pattern.Pattern
import org.apache.flink.cep.scala.CEP
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
object WindowedWordCount {
val configFactory = ConfigFactory.load()
def main(args: Array[String]) = {
val brokers = configFactory.getString("kafka.broker")
val topicChannel1 = configFactory.getString("kafka.topic1")
val props = new Properties()
...
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val dataStream = env.addSource(new FlinkKafkaConsumer010[String](topicChannel1, new SimpleStringSchema(), props))
val partitionedInput = dataStream.keyBy(jsonString => {
val jsonParser = new JsonParser()
val jsonObject = jsonParser.parse(jsonString).getAsJsonObject()
jsonObject.get("account")
})
val priceCheck = Pattern.begin[String]("start").where({jsonString =>
val jsonParser = new JsonParser()
val jsonObject = jsonParser.parse(jsonString).getAsJsonObject()
jsonObject.get("account").toString == "iOS"})
val pattern = CEP.pattern(partitionedInput, priceCheck)
val newStream = pattern.select(x =>
x.get("start").map({str =>
str
})
)
newStream.print()
env.execute()
}
}

由于某种原因,在上面的代码中newStream.print()没有打印出任何内容。我很肯定 Kafka 中的数据与我上面定义的模式相匹配,但由于某种原因,没有打印出任何内容。

任何人都可以帮我发现此代码中的错误吗?

编辑:

class TimestampExtractor extends AssignerWithPeriodicWatermarks[String] with Serializable {
override def extractTimestamp(e: String, prevElementTimestamp: Long) = {
val jsonParser = new JsonParser()
val context = jsonParser.parse(e).getAsJsonObject.getAsJsonObject("context")
Instant.parse(context.get("serverTimestamp").toString.replaceAll(""", "")).toEpochMilli
}
override def getCurrentWatermark(): Watermark = {
new Watermark(System.currentTimeMillis())
}
}

我在 flink 文档上看到,您可以在extractTimestamp方法中返回prevElementTimestamp(如果您使用的是 Kafka010(并在getCurrentWatermark方法中new Watermark(System.currentTimeMillis)

但我不明白prevElementTimestamp是什么,或者为什么人们会new Watermark(System.currentTimeMillis)返回为水印而不是其他东西。您能否详细说明为什么我们这样做,请问WaterMarkEventTime如何一起工作?

您确实将作业设置为在EventTime中工作,但不提供时间戳和水印提取器。

有关在事件时间工作的更多信息,请参阅这些文档。如果您想使用 kafka 嵌入式时间戳,此文档可能会对您有所帮助。

EventTimeCEP 库会在水印到达时缓冲事件,以便正确处理无序事件。在您的情况下,不会生成水印,因此事件会无限缓冲。


编辑:

  1. 对于prevElementTimestamp,我认为文档非常清楚:

    使用 Kafka 中的时间戳时,无需定义时间戳提取器。extractTimestamp(( 方法的 previousElementTimestamp 参数包含 Kafka 消息携带的时间戳

    由于 Kafka 0.10.x Kafka 消息可以嵌入时间戳。

  2. 在这种情况下,生成Watermarknew Watermark(System.currentTimeMillis)不是一个好主意。您应该根据您对数据的了解创建Watermark。有关WatermarkEventTime如何协同工作的信息,我不能比文档更清楚

相关内容

  • 没有找到相关文章

最新更新