Scala Monix:从Observer.foldLeft收集数据



通过使用Observer,我试图构建代码,其中:

1。生成一些(随机)值;

2。

3。如果某个组合值超过阈值,则必须将该值传递给另一个处理程序。因此,我希望返回结果值以供进一步使用

我代码:

//generate
val o: Observable[Int] = Observable.repeatEval(Random.nextInt(10))
//handle 
val f = o.foldLeft(0) { (acc, el) =>
if (acc < 15) {
el + acc
} else {
println("handled " + acc)
acc
}
}
//use handled
.flatMap{res =>
println("mapped " + res + 1)
Observable(res + 1)
}

但是没有传递给flatmap -方法。输出如下所示:

0
3
7
11
12
handled 20

我做错了什么?

您应该使用mapAccumulate+collect

def groupWhile[A, B](o: Observable[A], s: B)(op: (B, A) => B)(predicate: B => Boolean): Observable[B] =
o.mapAccumulate(seed = s) {
case (b, a) =>
val bb = op(b, a)
if (predicate(bb)) (bb, None) else (s, Some(bb))
} collect {
case Some(b) => b
}

使用方式:

// Generate.
val o: Observable[Int] = Observable.repeatEval(Random.nextInt(10))
// Handle.
val f = groupWhile(o, 0)((acc, i) => acc + i)(r => r <= 15)
// Use handled.
f.mapEval { x =>
val res = x + 1
Task(println("Mapped: ${res}")).as(res)
}

就像我一直说的,Scaladoc是你的朋友。

最新更新