如何在Kafka流中打印TimeWindowedKStream和KTable



我们有一个Kafka进程,它将主题作为输入,并将定时窗口写入输出主题。。正在使用以下代码。出于调试目的,我想打印TimeWindowedKStream(groupedStream(和KTable(aggregatedTable(并查看输出。。

String intopic = input_topic;
Long window = 60;
String outtopic = output_topic;

final Serde<String> stringSerde = Serdes.String();
Properties property = new Properties();
property.put("bootstrap.servers", "127.0.0.1:9092");
property.put("group.id", "test-consumer-group");
property.put("application.id", "sliding-window-min-bar");
property.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, stringSerde.getClass().getName());
property.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, stringSerde.getClass().getName());
Duration windowSizeMs = Duration.ofMinutes(window);
StreamsBuilder builder = new StreamsBuilder();

System.out.println(intopic);
KStream<String, String> equitybar = builder.stream(intopic, Consumed.with(stringSerde, stringSerde));

System.out.println(equitybar);

equitybar.print(Printed.toSysOut());
// convert string of csv to a double on the mean value
KStream<String, String> transformedbar = equitybar
.map((key, value) -> KeyValue.pair(key, value.substring(1,value.length()-2).split(",")[2]));
System.out.println(transformedbar);
transformedbar.print(Printed.toSysOut());

// group by equity and sliding window

System.out.println(windowSizeMs);
System.out.println(TimeWindows.of(windowSizeMs).advanceBy(advanceMs));

TimeWindowedKStream<String, String> groupedStream = transformedbar.groupByKey().windowedBy(TimeWindows.of(windowSizeMs).advanceBy(advanceMs));
System.out.println(groupedStream);
KTable<Windowed<String>, String> aggregatedTable = groupedStream.aggregate(
() -> "|",
(aggKey, newValue, aggValue) -> aggValue + newValue.trim() + "|")      ; 

我尝试使用用于Kafka流的print命令打印它-groupedStream.print(Printed.toSysOut(((;-但它似乎不起作用。

谢谢。

KGroupedStreamTimeWindowedKStream是";只是";助手类,以允许DSL向连锁运营商提供流畅的API,而不会在单个类上过载过多。

在DSL中,只有两个主要的抽象,KStreamKTable,它们是实际的一级数据容器。因此,你想做什么是不可能的。

相关内容

  • 没有找到相关文章

最新更新