我有一个对象流,我想在其中计算该对象中字段的平均值,然后将该平均值保存回该对象。我希望有一个 5 分钟的翻滚窗口,保留 1 小时。 我是卡夫卡的新手,所以我想知道这是否是解决问题的正确方法。
首先,我创建一个持久存储:
StoreBuilder<WindowStore<String, Double>> averagesStoreSupplier =
Stores.windowStoreBuilder(
Stores.persistentWindowStore(WINDOW_STORE_NAME, Duration.ofHours(1), Duration.ofMinutes(5), true),
Serdes.String(),
Serdes.Double());
streamsBuilder.addStateStore(averagesStoreSupplier);
然后我使用以下方法调用我的转换器:
otherKTable
.leftJoin(objectKTable.transformValues(new AveragingTransformerSupplier(WINDOW_STORE_NAME), WINDOW_STORE_NAME),
myValueJoiner)
.to("outputTopic")
这是我的变压器:
public class AveragingTransformerSupplier implements ValueTransformerWithKeySupplier<String, MyObject, MyObject> {
private final String stateStoreName;
public TelemetryAveragingTransformerSupplier(final String stateStoreName) {
this.stateStoreName = stateStoreName;
}
public ValueTransformerWithKey<String, MyObject, MyObject> get() {
return new ValueTransformerWithKey<>() {
private WindowStore<String, Double> averagesStore;
@Override
public void init(ProcessorContext processorContext) {
averagesStore = Try.of(() ->(WindowStore<String, Double>) processorContext.getStateStore(stateStoreName)).getOrElse((WindowStore<String, Double>)null);
}
@Override
public MyObject transform(String s, MyObject myObject) {
if (averagesStore != null) {
averagesStore.put(s, myObject.getNumber());
Instant timeFrom = Instant.ofEpochMilli(0); // beginning of time = oldest available
Instant timeTo = Instant.now();
WindowStoreIterator<Double> itr = averagesStore.fetch(s, timeFrom, timeTo);
double sum = 0.0;
int size = 0;
while(itr.hasNext()) {
KeyValue<Long, Double> next = itr.next();
size++;
sum += next.value;
}
myObject.setNumber(sum / size);
}
return myObject;
}
@Override
public void close() {
if (averagesStore != null) {
averagesStore.flush();
}
}
};
}
}
我有几个问题。 首先,我定义 WindowStore 的方式是形成翻转窗口的正确方法吗? 如何创建跳跃窗口?
其次,在我的变压器里,我从商店里得到从开始到现在的所有物品。 既然我将其定义为 5 分钟的窗口和 1 小时的保留期,这是否意味着商店中的商品是 5 分钟数据的快照? 保留在这里有什么作用?
我有这个处理琐碎的情况,但不确定是否有更好的方法来使用聚合和连接来做到这一点,或者即使我这样做是否正确。 此外,我还必须围绕在尝试捕获中获取商店的检索,因为 init 被多次调用,有时我会收到Processor has no access to StateStore
异常。
对于此用例,我建议使用DSL而不是处理器API。 详情请参阅 https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Stream+Usage+Patterns。
我有几个问题。首先,我定义 WindowStore 的方式是形成翻转窗口的正确方法吗?如何创建跳跃窗口?
窗口存储可用于跳跃或翻转窗口 - 这取决于您在处理器中如何使用它,而不是您如何创建商店,以及您获得的窗口语义。
其次,在我的变压器里,我从商店里得到从开始到现在的所有物品。既然我将其定义为 5 分钟的窗口和 1 小时的保留期,这是否意味着商店中的商品是 5 分钟数据的快照?保留在这里有什么作用?
创建应用商店时windowSize
的参数无法按预期方式工作。您需要使用put(key, value, windowStartTimestamp)
在Transformer
代码中手动编写窗口逻辑 - atm,您正在使用使用context.timestamp()
的put(key, value)
,即当前记录时间戳,作为 windowStartTimestamp - 我怀疑这是你想要的。保留时间基于窗口时间戳,即旧窗口过期后将被删除。