如何按键分隔FS2流以分别转换每个分区



我想实现的目标,例如,给定数据:

time, part, data
0, a, 3
1, a, 4
2, b, 10
3, b, 20
3, a, 5

和转换:

stream.keyBy(_.part).scan(0)((s, d) => s + d)

获取:

0, a, 3
1, a, 7
2, b, 10
3, b, 30
3, a, 12

我已经尝试使用groupAdjacentBy进行了分区,但是它变得太复杂了,因为我需要用密钥在每个块之间保留复杂的状态。我想知道是否有类似的flink datastream.keyby?还是实施它的简单方法?

好吧,我找到了有趣的解决方案(尽管不能是 flatten

所述的问题可以通过"分区"来解决。在扫描操作本身中:

import cats.implicits._
import cats.effect.IO
import fs2._
case class Element(time: Long, part: Symbol, value: Int)
val elements = Stream(
  Element(0, 'a, 3),
  Element(1, 'a, 4),
  Element(2, 'b, 10),
  Element(3, 'b, 20),
  Element(3, 'a, 5)
)
val runningSumsByPart = elements
  .scan(Map.empty[Symbol, Int] -> none[Element]) {
    case ((sums, _), el@Element(_, part, value)) =>
      val sum = sums.getOrElse(part, 0) + value
      (sums + (part -> sum), el.copy(value = sum).some)
  }
  .collect { case (_, Some(el)) => el }
runningSumsByPart.covary[IO].evalTap(el => IO { println(el) }).compile.drain.unsafeRunSync()

输出:

元素(0,'a,3)

元素(1,'a,7)

元素(2,'b,10)

元素(3,'b,30)

元素(3,'a,12)

我做了这样的事情。首先分开,然后合并。我还不知道如何返回2个流。我只知道如何在一个地方处理它们,然后将它们合并在一起。

    val notEqualS = in
      .filter(_.isInstanceOf[NotEqual])
      .map(_.asInstanceOf[NotEqual])
      ...
    val invalidS = in
      .filter(_.isInstanceOf[Invalid])
      .map(_.asInstanceOf[Invalid])
      ...
    notEqualS.merge(invalidS)

相关内容

  • 没有找到相关文章

最新更新