我正在通过dataartisans的flink教程材料,由于某种原因,当我到达示例文件possplaceplacesfromkafka.scala时我没有将任何输出发送到Stdout。
...
// find popular places
val popularSpots = rides
// match ride to grid cell and event type (start or end)
.map(new GridCellMatcher)
// partition by cell id and event type
.keyBy( k => k )
// build sliding window
.timeWindow(Time.minutes(15), Time.minutes(5))
// count events in window
.apply{ (key: (Int, Boolean), window, vals, out: Collector[(Int, Long, Boolean, Int)]) =>
out.collect( (key._1, window.getEnd, key._2, vals.size) )
}
// print result on stdout
popularSpots.print()
...
我已经确认数据已从Kafka OK中获取,并且当它尝试执行" TimeWindow"操作时,我似乎没有输出。如果我删除" TimeWindow"操作,我可以看到输出的"钥匙比"数据。我缺少一些明显的东西吗?
您是否为源配置了适当的加速度?默认情况下(没有加速因素),源模拟原始数据,即,它以与最初生成的相同速度发射记录。这意味着产生1分钟的数据需要1分钟。
窗口操作员在最后15分钟的数据中每5分钟聚合一次。因此,将需要5分钟,直到窗口操作员产生第一个结果。
如果将加速因子设置为600,则将在1秒内获得10分钟的数据。
如果有人有同样的问题,这是我的问题。
我的kafka主题有多个分区,但是将所有测试数据生成一个分区(0),一旦我拥有> 1个kafka消费者,除了分配给分区0的消费者外,所有的消费者都没有接收。任何数据,因此不会在操作员链中发送任何水印 - 这会导致窗口功能停止发射数据(这也是为什么在这些情况下处理时间正常的原因)。这是一个相关的JIRA:
https://issues.apache.org/jira/browse/flink-5479
通常,有几个原因导致弗林克作业可能不会产生任何输出,但是一个非常普遍的原因与水印有关。Flink的活动时间时钟只有在当前水印的前进,因此如果没有水印,活动时间窗口就永远不会开火。
在弗林克训练练习的情况下,出租车骑行源为您提供水印。但是,既然您正在使用KAFKA源,则必须实现时间戳提取器和水印生成器,然后在流上调用assignTimestampsAndWatermarks
(请参阅文档)。BoundedOutOfOrdernessTimestampExtractor
延迟与写入Kafka的作业配置的延迟相匹配。