Scala KStreams从许多过滤器转移到拆分和分支



假设这里所有未定义的方法都有有效的签名。我有以下代码可以完美地工作:

// Add validation tags to the stream
val validated: KStream[K, V] = stream.mapValues(x => validate(x))
// Predicate to separate valid and invalid regions
val isInvalid: (K, V) => Boolean = (_: K, v: V) => !v.isValid
// Branch the rejects out of the main stream and into a rejects topic
val incAndReject: KStream[K, V] => Unit =
_.mapValues(f1)
.peek(incMetric1)
.to("bad-topic")

现在我有以下工作片段:

validated.peek((_, v) => incMetric2(v))
val temp = validated.filter(isInvalid)
incAndReject(temp)
validated.filter((_, v) => v.isValid)

我正试图将其更改为,我认为是,使用拆分和分支而不是过滤器的等效代码,但它不起作用:

// Branched names are for the topology, they do not affect topic names (?)
val rejectsBranch: Branched[K, V] = Branched.withConsumer[K, V](incAndReject, "invalid-region")
validatedEnrichedStream
.peek((_, v) => incMetric2(v))
.split()
.branch(isInvalid, rejectsBranch)
.defaultBranch()
.head
._2

我做得对吗?关于KStreams分支流的文档非常糟糕,而且仅在Java 中提供

键在映射元素的键上,与分支名称略有不同,因此.head._2工作,而.get("name")

最新更新