我需要知道如何在我的kafka kstreams系列中使用'循环...下面是我的'for'for'loop,它需要包含在kstreams
中for (int i = 0; i < 6 ; i++) {
try {
textlines.flatMapValues(value -> Arrays.asList(value.split("\},\{")));
Thread.sleep(2000);
}catch (InterruptedException e){
e.printStackTrace();
}
}
和我的kstreams looks,例如
KStream<String, String> textlines = builder.stream("intopic");
KStream<String, String> mstream = textlines
.mapValues(value -> value.replace("[","" ) )
如何将上面的"'for"循环添加到我的kstreams
问题是我使用了值。在另一个
之后
根据您要订购的内容。为了实现订购,您不需要sleep
。它只会工作。我认为您的代码基于的Kafka流量示例以相同的方式工作:它也使用flatMapValues
,而传递到平面地图中的lambda将文本行分为单词。
除非我和其他人误解了您的问题(在这种情况下,您也许应该进一步澄清您的问题),我认为您不必要地使您的代码复杂化。