闪烁 CEP 事件未触发



我已经在 Flink 中实现了 CEP 模式,它按预期连接到本地 Kafka 代理。但是当我连接到基于集群的云 kafka 设置时,Flink CEP 没有触发。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//saves checkpoint
env.getCheckpointConfig().enableExternalizedCheckpoints(
CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

我正在使用升序时间戳提取器,

consumer.assignTimestampsAndWatermarks(
new AscendingTimestampExtractor<ObjectNode>() {
@Override
public long extractAscendingTimestamp(ObjectNode objectNode) {
long timestamp;
Instant instant = Instant.parse(objectNode.get("value").get("timestamp").asText());
timestamp = instant.toEpochMilli();
return timestamp;
}
});

而且我收到警告消息,

升序时间戳提取器:140 - 违反时间戳单调性:1594017872227 <1594017873133

我也尝试使用AssignerWithPeriodicWatermarks和AssignerWithPunctuatedWatermarks,没有一个工作

我附上了未分配水印的 Flink 控制台屏幕截图。 更新了狐狸控制台截图

谁能帮忙?

CEP 必须首先对输入流进行排序,它根据水印进行排序。所以 问题可能出在水印上,但您还没有向我们展示足够的时间来调试原因。一个常见的问题是有一个空闲的源,这可能会阻止水印前进。

但还有其他可能的原因。为了调试这种情况,我建议你查看一些指标,无论是在 Flink Web UI 中,还是在指标系统中(如果你连接了一个(。首先,通过查看管道不同阶段的numRecordsInnumRecordsOutnumRecordsInPerSecondnumRecordsOutPerSecond来检查记录是否在流动。

如果有事件,请查看作业的不同任务中的currentOutputWatermark,看看事件时间是否在推进。

更新:

看起来您可能正在 Kafka 使用者上调用assignTimestampsAndWatermarks,这将导致每个分区的水印。在这种情况下,如果您有一个空闲分区,则该分区不会产生任何水印,这将阻止整体水印。尝试改为在源生成的数据流上调用assignTimestampsAndWatermarks,看看是否可以解决问题。(当然,如果没有每个分区的水印,您将无法使用 AscendingTimestampExtractor,因为流将不按顺序排列。

最新更新