在Kafka流上创建用于窗口数据的SERD



我有一些麻烦来创建一个与我正在汇总的数据一起使用的SERD,并且需要通过'.to((发送到另一个主题,但是,我需要为窗口数据创建SERDE,我不确定如何做。

我们可以通过以下方式为窗口数据创建串行器和避难器。

StringSerializer stringSerializer = new StringSerializer();
StringDeserializer stringDeserializer = new StringDeserializer();
Serde<String> stringSerde = Serdes.serdeFrom(stringSerializer,stringDeserializer);
WindowedSerializer<String> windowedSerializer = new WindowedSerializer<>(stringSerializer);
WindowedDeserializer<String> windowedDeserializer = new WindowedDeserializer<>(stringDeserializer);
Serde<Windowed<String>> windowedSerde = Serdes.serdeFrom(windowedSerializer,windowedDeserializer);

在下面给出的窗口数据中使用串行器/欲望器。

KStream<String,StockTransaction> transactionKStream =  kStreamBuilder.stream(stringSerde,transactionSerde,"stocks");
transactionKStream.map((k,v)-> new KeyValue<>(v.getSymbol(),v))
                              .through(stringSerde, transactionSerde,"stocks-out")
                              .groupBy((k,v) -> k, stringSerde, transactionSerde)
                              .aggregate(StockTransactionCollector::new,
                                   (k, v, stockTransactionCollector) -> stockTransactionCollector.add(v),
                                   TimeWindows.of(10000),
                                   collectorSerde, "stock-summaries")
                    .to(windowedSerde,collectorSerde,"transaction-summary");

我建议您浏览以下内容以获取更多信息。

https://www.programcreek.com/java-api-examples/index.php?api=org.apache.kafka.kafka.kafka.streams.kstreams.kstreams.interam.internals.windowedserialializer

相关内容

  • 没有找到相关文章

最新更新