Kafka Streams Materialized View with Kotlin



要在Java中创建kafka流状态存储,我可以这样做:

final KGroupedStream<String, String> wordCounts = textLines
.flatMapValues(value -> Arrays.asList(pattern.split(value.toLowerCase())))
.groupBy((key, word) -> word);
wordCounts.count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as(WORD_COUNT_STORE));

我正在尝试将其转换为 Kotlin,如下所示:

val wordCounts: KGroupedStream<String, String> = textLines
.flatMapValues({value -> value.split("\W+") })
.groupBy({ _, word -> word})
wordCounts.count(Materialized.<String, Long, KeyValueStore<Bytes, Array<Byte>>>as(WORD_COUNT_STORE))

但是,我收到以下编译器错误:

Interface KeyValueStore does not have constructors

我需要做什么?

如果它对其他人有用,以及拉曼建议的反引号,我不得不进行其他一些更改:

  • 首先,需要在as方法之后指定泛型类型,而不是直接在Materialized类之后指定泛型类型。
  • 其次,我必须使用ByteArray而不是使用Array<Byte>

因此,对我有用的整行代码是:

wordCounts.count(Materialized.`as`<String, Long, KeyValueStore<Bytes, ByteArray>>(WORD_COUNT_STORE))

由于as在 Kotlin 中是一个保留词,请尝试用反引号将as括起来,即

`as`