我想加入来自kafka生产者的两个流,但加入不起作用。我使用AssignerWithPeriodicWatermark来定义我的分配器,并尝试使用3分钟的窗口连接这两个流。但我没有得到任何输出。我打印了这两个流,以确保它们的事件在时间上足够接近。
object Job {
class Assigner extends AssignerWithPeriodicWatermarks[String] {
// 1 s in ms
val bound: Long = 1000
// the maximum observed timestamp
var maxTs: Long = Long.MinValue
override def getCurrentWatermark: Watermark = {
new Watermark(maxTs - bound)
}
override def extractTimestamp(r: String, previousTS: Long): Long = {
maxTs = Math.max(maxTs,previousTS)
previousTS
}
}
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment//createLocalEnvironment()
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9093")
properties.setProperty("group.id", "test")
val consumerId = new FlinkKafkaConsumer[String]("topic_id", new SimpleStringSchema(), properties)
val streamId = env.addSource(consumerId).assignTimestampsAndWatermarks(new Assigner)
val streamIdParsed=streamId.map{s =>s.parseJson}.map{ value => (value.asJsObject.getFields("id")(0).toString(),value.asJsObject.getFields("m","w")) }
val consumerV = new FlinkKafkaConsumer[String]("topic_invoice", new SimpleStringSchema(), properties)
val streamV = env.addSource(consumerV).assignTimestampsAndWatermarks(new Assigner)
val streamVParsed = streamV.map{s =>s.parseJson}.map{ value => (value.asJsObject.getFields("id")(0).toString(),value.asJsObject.getFields("products")(0).toString().parseJson.asJsObject.getFields("id2", "id3")) }
streamIdParsed.join(streamVParsed).where(_._1).equalTo(_._1).window(SlidingEventTimeWindows.of(Time.seconds(60),Time.seconds(1))).apply { (e1, e2) => (e1._1,"test") }.print()
} }
问题是您尚未设置autoWatermarkInterval
,而正在使用PeriodicAssigner
。您需要执行以下操作:
env.getConfig.setAutowatermarkInterval([someinterval])
这应该可以解决未生成水印的问题。
可能出错的事情(由您检查,因为您提供的信息太稀疏,无法缩小范围(
- 没有任何关于卡夫卡的事件
- 关于任一主题的无水印进展
- 您的数据按分钟运行,代码按秒运行