我想实现的目标,例如,给定数据:
time, part, data
0, a, 3
1, a, 4
2, b, 10
3, b, 20
3, a, 5
和转换:
stream.keyBy(_.part).scan(0)((s, d) => s + d)
获取:
0, a, 3
1, a, 7
2, b, 10
3, b, 30
3, a, 12
我已经尝试使用groupAdjacentBy
进行了分区,但是它变得太复杂了,因为我需要用密钥在每个块之间保留复杂的状态。我想知道是否有类似的flink datastream.keyby?还是实施它的简单方法?
好吧,我找到了有趣的解决方案(尽管不能是 flatten
)
所述的问题可以通过"分区"来解决。在扫描操作本身中:
import cats.implicits._
import cats.effect.IO
import fs2._
case class Element(time: Long, part: Symbol, value: Int)
val elements = Stream(
Element(0, 'a, 3),
Element(1, 'a, 4),
Element(2, 'b, 10),
Element(3, 'b, 20),
Element(3, 'a, 5)
)
val runningSumsByPart = elements
.scan(Map.empty[Symbol, Int] -> none[Element]) {
case ((sums, _), el@Element(_, part, value)) =>
val sum = sums.getOrElse(part, 0) + value
(sums + (part -> sum), el.copy(value = sum).some)
}
.collect { case (_, Some(el)) => el }
runningSumsByPart.covary[IO].evalTap(el => IO { println(el) }).compile.drain.unsafeRunSync()
输出:
元素(0,'a,3)
元素(1,'a,7)
元素(2,'b,10)
元素(3,'b,30)
元素(3,'a,12)
我做了这样的事情。首先分开,然后合并。我还不知道如何返回2个流。我只知道如何在一个地方处理它们,然后将它们合并在一起。
val notEqualS = in
.filter(_.isInstanceOf[NotEqual])
.map(_.asInstanceOf[NotEqual])
...
val invalidS = in
.filter(_.isInstanceOf[Invalid])
.map(_.asInstanceOf[Invalid])
...
notEqualS.merge(invalidS)