``我正在设计一个管道,让一个生产者向输入的kafka主题发布随机句子。接下来,是我的窗口字数流应用程序,它获取输入数据并对其进行操作,然后获得窗口字数(5的滚动窗口(。问题此处是
输出主题中通过消费者控制台查看的输出如下:abc 1abc 2堆栈1溢出2溢出3…等等
我想要的输出格式:
{"单词":"abc","计数":1}
现在我需要使用kafkaconnect将此发送到elasticsearch。其他一切都在起作用。这只是一个序列化错误,因为elasticsearch接受json格式的数据。
所以在我的流对输出数据进行操作后,我想要json格式的输出数据我该如何实现它。我完全被卡住了。我需要在流应用程序本身中进行转换。正在下面附加流式应用程序以进行更改。请帮忙。
KStream<String, String> initialstream = builder.stream("TextLinesTopic", Consumed.with(Serdes.String(), Serdes.String()));
KStream<String, String> Tstream = initialstream.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\W+")));
KGroupedStream<String, String> TgroupedStream = Tstream
.groupBy((key, word) -> word, Grouped.with(Serdes.String(), Serdes.String()));
KTable<Windowed<String>, Long> Ttable = TgroupedStream
.windowedBy(TimeWindows.of(Duration.ofMinutes(5)))
.count();
Ttable
.toStream()
.selectKey((key, word) -> key.key())
.to("pipeoutput", Produced.with(Serdes.String(), Serdes.Long()));
在.to()
之前,您需要将密钥、值对.map()
转换为与您希望发送给下游消费者的格式相匹配的预期输出
您还必须将Produced.with值serde更改为不是Long,因为您将编写JSON字符串