与FS2.Stream分组活动



我的事件流如下:

sealed trait Event
val eventStream: fs2.Stream[IO, Event] = //...

我想在一分钟内(即从0秒到每分钟的59秒)进行分组。fs2

听起来很简单
val groupedEventsStream = eventStream groupAdjacentBy {event => 
    TimeUnit.MILLISECONDS.toMinutes(System.currentTimeMillis())
}

问题是分组功能不是纯。它使用currentTimeMillis。我可以如下:

stream.evalMap(t => IO(System.currentTimeMillis(), t))
  .groupAdjacentBy(t => TimeUnit.MILLISECONDS.toMinutes(t._1))

问题是,我想避免使用笨拙的样板。还有其他解决方案吗?

或使用不纯净的功能对这种情况并不糟糕?

您可以使用cats.effect.clock删除一些样板:

def groupedEventsStream[A](stream: fs2.Stream[IO, A])
                          (implicit clock: Clock[IO], eq: Eq[Long]): fs2.Stream[IO, (Long, Chunk[(Long, A)])] =
  stream.evalMap(t => clock.realTime(TimeUnit.MINUTES).map((_, t)))
    .groupAdjacentBy(_._1)

相关内容

  • 没有找到相关文章

最新更新