我有两个Kafka主题,A和b。
有时两个主题都可以空闲,但是,当主题B或A有新数据时,flink应用程序可能需要几分钟才能处理(在空闲之后)。
应用程序配置为使用事件时间,使用for单调时间戳。
作业的结构如下:
- KafkaSource
- ProcessFunction
- KeyBy
- 连接两个流
- CoProcessFunction
ProcessFunction立即从Kafka主题中获取数据(即使在它空闲之后),KeyBy也是如此。
然而,CoProcessFunction的process1或process2都没有被快速触发,我看到在它们被触发之前延迟了大约3分钟,为什么会这样?
当两个主题都有数据连续推送时,性能非常好。
我也试着实现我自己的WatermarkGenerator,像这样
@Override
public void onEvent(T event, long eventTimestamp, WatermarkOutput output) {
// don't need to do anything because we work on processing time
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(System.currentTimeMillis()));
}
我可以看到onPeriodicEmit
函数确实每5秒发出一次,但这并不能解决我的问题,并且看着flink web,水印没有进展。
Using Flink 1.14
我怀疑空闲检测有问题:
The Kafka Source does not go automatically in an idle state if
the parallelism is higher than the number of partitions.
You will either need to lower the parallelism
or add an idle timeout to the watermark strategy.
If no records flow in a partition of a stream for that amount of time,
then that partition is considered “idle” and
will not hold back the progress of watermarks in downstream operators.
请参阅https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/kafka/#idleness了解更多详细信息以及如何分配WatermarkStrategy#withIdleness
来解决此问题。