我发现了两个问题,询问如果没有新记录放入分区,为什么不会发出结果记录:
1."Kafka 流抑制会话窗口聚合"和
2."Kafka 流(抑制):通过超时关闭时间窗口">
在这两个问题的答案中,解释是有必要发送一个新记录才能发出一个。
我不明白为什么在没有新记录的情况下超时后发出记录会违反抑制合同,并希望得到解释。
到目前为止,最好的建议是使用虚拟记录来触发发射。
我认为关闭和重新启动流(拓扑)可能比编写虚拟记录更合适。我以为流的新实例会使记录达到峰值并在超时已经过期时发出结果。
但是,我尝试并发现它不起作用。如果可能的话,我将不胜感激。
@Slf4j
public class KafkaStreamVerticle extends AbstractVerticle {
private KafkaStreams streams;
@Override
public void start(Future<Void> startFuture) throws Exception {
Single.fromCallable(() -> getStreamConfiguration()).subscribe(config -> {
final StreamsBuilder builder = new StreamsBuilder();
builder.<String, String>stream(KafkaProducerVerticle.TOPIC)
.flatMapValues((k, v) -> List.<JsonObject>of(new JsonObject(v).put("origKey", k)))
.selectKey((k, v) -> v.getString(KafkaProducerVerticle.CATEGORY))
.flatMapValues(v -> List.<String>of(v.toString()))
.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(Duration.ofSeconds(4)).grace(Duration.ZERO)).count()
// .suppress(Suppressed.untilWindowCloses(BufferConfig.unbounded())).toStream().foreach((k,
.suppress(Suppressed.untilTimeLimit(Duration.ofSeconds(4), BufferConfig.unbounded()))
.toStream().foreach((k, v) -> log.info("********* {}: {} - {}: {}", k.key(),
k.window().start(), k.window().end(), v));
streams = buildAndStartsNewStreamsInstance(config, builder);
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
restartStreamsPeriodicaly(config, builder, 30_000L);
log.info("consumer deployed");
startFuture.complete();
});
}
private KafkaStreams buildAndStartsNewStreamsInstance(Properties config,
final StreamsBuilder builder) {
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.cleanUp();
streams.start();
return streams;
}
private void restartStreamsPeriodicaly(Properties config, final StreamsBuilder builder,
@NonNull Long period) {
vertx.setPeriodic(period, l -> {
log.info("restarting streams!!");
streams.close();
streams = buildAndStartsNewStreamsInstance(config, builder);
});
}
private Properties getStreamConfiguration() {
final Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "suppress-example");
streamsConfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "suppress-client");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass().getName());
streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, "/tmp/kafka-streams");
streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10);
streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0L);
return streamsConfiguration;
}
}
Kafka Stream 提供事件时间语义,这意味着,它只是基于记录时间戳的内部时间高级(内部时间永远不会基于挂钟时间前进)。您正在采取的"超时",它也基于事件时间(不是挂钟时间)。
假设您有一个大小为 5 的窗口(即,[0,5)
将是一个窗口),并且您看到的数据为 ts=1,2,3。这意味着下一条记录的时间戳=4,并且必须包含在窗口中。但是,如果没有新数据到达,则无论等待多长时间,都无法发出窗口结果。仅当时间戳 = 5 的记录到达时,内部时间才会前进并且现在大于窗口结束时间,并且发出窗口的结果。如果 suppress() 会在某个基于挂钟的超时后发出数据,并且下一条记录的时间戳=4,它将发出错误的结果。
此外,suppress() 会记住它的内部状态和时间。因此,即使您重新启动应用程序,suppress() 仍将缓冲数据,并且仍将等待 timestamp=5 的记录发出数据。