卡夫卡流:如何在聚合的同时产生一个主题



我目前有一些代码可以使用聚合构建KTable:

inputTopic.groupByKey().aggregate(
Aggregator::new,
(key, value, aggregate) -> {
someProcessingDoneHere;
return aggregate;
},
Materialized.with(Serdes.String(), Serdes.String())
);

一旦接收到给定数量的消息并为单个键聚合,我想将最新的聚合状态推送到另一个主题,然后删除表中的键。

很明显,我可以使用一个普通的卡夫卡制作人,并有这样的东西:

inputTopic.groupByKey().aggregate(
Aggregator::new,
(key, value, aggregate) -> {
someProcessingDoneHere;
if (count > threshold) {
producer.send(new ProducerRecord<String,String>("output-topic", 
key, aggregate));
return null;
}
return aggregate;
},
Materialized.with(Serdes.String(), Serdes.String())
);

但我正在寻找一个更";流";方法

有什么提示吗?

我认为这里最好的解决方案是将聚合返回到流中,然后在将其发送到主题之前过滤所需的值。

inputTopic.groupByKey().aggregate(
Aggregator::new,
(key, value, aggregate) -> {
someProcessingDoneHere;
return aggregate;
},
Materialized.with(Serdes.String(), Serdes.String())
)
.toStream()
.filter((key, value) -> (value.count > threshold)
.to("output-topic");

编辑:我刚刚意识到你想在序列化之前完成这个。我认为唯一的方法是使用转换器或处理器,而不是聚合。

在那里,您可以访问StateStore而不是KTable。它还允许您访问context.forward(),使您可以以任何方式向下游转发消息。

一些伪代码来展示如何使用转换来完成

@Override
public Transformer<String, String, KeyValue<String, String>> get() {
return new Transformer<String, String, KeyValue<String, String>>() {
private KeyValueStore<String, String> stateStore;
private ProcessorContext context;
@SuppressWarnings("unchecked")
@Override
public void init(final ProcessorContext context) {
this.context = context;
stateStore = (KeyValueStore<String, String>) context.getStateStore(STATE_STORE_NAME);
}
@Override
public KeyValue<String, String> transform(String key, String value) {
String prevAggregation = stateStore.get(key);
//use prevAggregation and value to calculate newAggregation here:
//...
if (newAggregation.length() > threshold) {
context.forward(key, newAggregation);
stateStore.delete(key);
} else {
stateStore.put(key, newAggregation);
}
return null; // transform ignore null
}
@Override
public void close() {
// Note: The store should NOT be closed manually here via `stateStore.close()`!
// The Kafka Streams API will automatically close stores when necessary.
}
};
}

相关内容

  • 没有找到相关文章

最新更新