在我的管道设置中,我看不到会话窗口的侧输出。我正在使用 Flink 1.9.1
版本 1。 我拥有的是这个:
messageStream.
.keyBy(tradeKeySelector)
.window(ProcessingTimeSessionWindows.withDynamicGap(new TradeAggregationGapExtractor()))
.sideOutputLateData(lateTradeMessages)
.process(new CumulativeTransactionOperator())
.name("Aggregate Transaction Builder");
lateTradeMessages实现SessionWindowTimeGapExtractor并返回5秒。
此外,我还有这个:
messageStream.getSideOutput(lateTradeMessages)
.keyBy(tradeKeySelector)
.process(new KeyedProcessFunction<Long, EnrichedMessage, Transaction>() {
@Override
public void processElement(EnrichedMessage value, Context ctx, Collector<Transaction> out) throws Exception {
System.out.println("Process Late messages For Aggregation");
out.collect(new Transaction());
}
})
.name("Process Late messages For Aggregation");
问题是,当我使用应该错过窗口时间的相同键发送消息时,我永远不会看到"处理延迟消息以进行聚合"。
当会话窗口通过并且我"立即"为同一键发送新消息时,它会触发新的会话窗口而不进入后期侧输出。
不知道我在这里做错了什么。
我想在这里实现的是捕捉"迟到的事件"并尝试 重新处理它们。
我将不胜感激任何帮助。
版本2,在@Dominik Wosiński评论之后:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1000, 1000));
env.setParallelism(1);
env.disableOperatorChaining();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.getConfig().setAutoWatermarkInterval(1000);
DataStream<RawMessage> rawBusinessTransaction = env
.addSource(new FlinkKafkaConsumer<>("business",
new JSONKeyValueDeserializationSchema(false), properties))
.map(new KafkaTransactionObjectMapOperator())
.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<RawMessage>() {
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(System.currentTimeMillis());
}
@Override
public long extractTimestamp(RawMessage element, long previousElementTimestamp) {
return element.messageCreationTime;
}
})
.name("Kafka Transaction Raw Data Source.");
messageStream
.keyBy(tradeKeySelector)
.window(EventTimeSessionWindows.withDynamicGap(new TradeAggregationGapExtractor()))
.sideOutputLateData(lateTradeMessages)
.process(new CumulativeTransactionOperator())
.name("Aggregate Transaction Builder");
水印正在进展中,我已经检查了 Flink 的指标。Window 运算符正在执行,但仍然没有延迟输出。
顺便说一句,Kafka 主题可能处于空闲状态,所以我必须定期发出新的水印。
水印方法对我来说看起来很可疑。通常,此时会输出最新的事件时间戳。
只是一些背景信息,以便更容易理解。
延迟事件是指在处理水印之后到事件发生后的事件。请考虑以下示例:
event1 @time 1
event2 @time 2
watermark1 @time 3
event3 @time 1 <-- late event
event4 @time 4
您的水印方法几乎会将所有过去的事件渲染为延迟事件(由于 1s 水印间隔,有点容忍(。这也将使后处理和追赶变得不可能。
然而,你实际上没有看到任何后期事件,这让我更惊讶。您能否仔细检查您的水印方法,描述您的用例并提供示例数据?很多时候,实现对于实际用例并不理想,应该以不同的方式解决。
您在您的情况下使用ProcessingTime
,这意味着系统时间用于测量DataStream
中时间的流量。
对于每个事件,分配给此事件的时间戳是您在 Flink 管道中收到数据的那一刻。这意味着 Flink 处理时间无法使事件无序。正因为如此,你的窗口永远不会有迟到的元素。
如果您切换到EventTime
,那么为了获得正确的输入数据,您应该能够看到传递到端输出的后期元素。
你可能应该看看文档,其中解释了 Flink 中各种时间概念。