Kafka Streams-GroupBy-Late Event-persistentWindowStore-带宽限期和



我的目的是计算每秒从源到目标的成功和失败消息,并以每日为单位对其结果进行汇总。

我有两种选择;

  1. 流式处理事件,然后将它们分组为时间#源#目的地
KeyValueBytesStoreSupplier streamStore = Stores.persistentKeyValueStore("store-name");  
sourceStream.selectKey((k, v) -> v.getDataTime() + KEY_SEPERATOR + SRC + KEY_SEPERATOR + DEST ).groupByKey().aggregate(
DO SOME Aggregation,
Materialized.<String, AggregationObject>as(streamStore)
.withKeySerde(Serdes.String())
.withValueSerde(AggregationObjectSerdes));

在尝试了上面的这种方法后,我们注意到状态存储正在增加,因为唯一密钥的数量正在增加,如果我是正确的,因为状态主题只是"紧凑的",它们永远不会过期。

NumberOfUniqueKeys=86.400秒一天X来源X目的

然后我们认为,如果我们不在KEY块中放入时间字段,我们可以减少状态存储的大小。我们尝试将开窗操作作为第二种方法。

  1. 使用带有persistentWindowStore、CustomTimeStampExtractor、WindowBy、Suppress的窗口操作

WindowBytesStoreSupplier streamStore = Stores.persistentWindowStore("store-name", Duration.ofHours(6), Duration.ofSeconds(1), false);
sourceStream.selectKey((k, v) -> SRC + KEY_SEPERATOR + DEST)
.groupByKey()  .windowedBy(TimeWindows.of(Duration.ofSeconds(1)).grace(Duration.ofSeconds(5)))
.aggregate(
{
DO SOME Aggregation
}, Materialized.<String, AggregationObject>as(streamStore)
.withKeySerde(Serdes.String())
.withValueSerde(AggregationObjectSerdes))
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded())).toStream();`

在尝试了第二种方法之后,我们减少了状态存储的大小,但现在我们遇到了延迟到达事件的问题。然后,我们通过抑制操作添加了5秒的宽限期,除了使用宽限期和抑制操作不能保证处理所有延迟到达的事件外,抑制操作的另一个副作用是延迟,因为它在窗口宽限期后发出聚合结果。

BTW

使用窗口操作导致出现如下警告消息"警告1-[-StreamThread-2]o.a.k.s.state.internals.WindowKeySchema:警告:窗口结束时间被截断为长.MAX">

我从源代码中检查了原因,从这里找到了https://github.com/a0x8o/kafka/blob/master/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java

/**
* Safely construct a time window of the given size,
* taking care of bounding endMs to Long.MAX_VALUE if necessary
*/
static TimeWindow timeWindowForSize(final long startMs,
final long windowSize) {
long endMs = startMs + windowSize;
if (endMs < 0) {
LOG.warn("Warning: window end time was truncated to Long.MAX");
endMs = Long.MAX_VALUE;
}
return new TimeWindow(startMs, endMs);
}

实际上,endMs如何低于0…对我来说毫无意义

问题?

  1. 如果我们采用方法1,如何减少状态存储大小?在方法1中,保证所有事件都将被处理,并且不会因为延迟而丢失事件
  2. 如果我们采用方法2,我应该如何调整我的逻辑,捕捉延迟到达的数据并减少延迟
  3. 为什么我在方法2中得到警告消息,尽管在我的模型中所有的时间字段都是正的
  4. 除了这两种方法之外,你还可以提出其他选择吗

我需要一些专家帮助:(

BR,

根据邮件kafka邮件组关于警告消息

类似"警告1-[-StreamThread-2]o.a.k.s.state.internals.WindowKeySchema:警告:窗口结束时间被截断为Long.MAX"的警告消息

它是写给我的:

您可以获得以下消息"o.a.k.s.state.internals.WindowKeySchema:警告:当您的创建TimeWindowDeserializer时没有windowSize。有两个TimeWindowDeserializer的构造函数,是否使用窗口大小?

https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java#L46-L55

它使用Long.MAX_VALUE调用WindowKeySchemahttps://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedDeserializer.java#L84-L90

相关内容

  • 没有找到相关文章

最新更新