将副积流转换为HList - Shapeless和FS2流



我有一个fs2流Stream[F, C],其中C <: Coproduct。我想把它转化成Stream[F, H]其中H <: HList。这个HList应该包含副产物C的所有成员。

所以,本质上,一个Pipe[F, C, H]

fs2 Pipe的工作方式是等待每个副产物的至少一个成员被拉出,然后一旦每个副产物的至少一个成员被拉出,最后将其组合成一个HList并输出。

所以,它的用法大致是这样的:

type MyCoprod = A :+: B :+: C :+: CNil
type MyHList = A :: B :: C :: HNil
val stream: Stream[F, MyHList] = Stream
.emits(List(A, B, C)) // my coproducts
.through(pullAll) // i want to wait for A, B, C to pulled at least once and outputted 
.map { hlist => ... }

我是Shapeless的新手,这是我在遇到障碍之前能想到的:

trait WaitFor[F[_], C <: Coproduct] {
type Out <: HList
def apply: Pipe[F, C, Out]
}
object WaitFor {
type Aux[F[_], C <: Coproduct, Out0 <: HList] =
WaitFor[F, C] { type Out = Out0 }
implicit def make[F[_], C <: Coproduct, L <: HList](implicit
toHList: ToHList.Aux[C, L]
): Aux[F, C, L] = new WaitFor.Aux[F, C, L] {
override type Out = L
override def apply: Pipe[F, C, Out] = {
def go(s2: Stream[F, C], currHList: L): Pull[F, L, Unit] = {
s2.pull.uncons1.flatMap {
case Some((coproduct, s3)) => {
// add or update coproduct member to currHList
// if currHList is the same as L (our output type) then output it (Pull.output1(currHList)) and clear currHList
// if not, keep iterating:
go(s3, ???)
}
case None => Pull.done
}
}
go(s1, ???).stream
}
}
def pullAll[F[_], C <: Coproduct](
stream: Stream[F, C]
)(implicit ev: WaitFor[F, C]): Stream[F, ev.Out] = {
stream.through(ev.apply)
}
}

我的路障从这里开始:

override def apply: Pipe[F, C, Out] = ???

那就是我对shape的了解耗尽的时候。

我的想法是跟踪元组(Option[C1], Option[C2],…)中的所有副积成员。

一旦元组中的每个元素都是Some,我将把它们转换为HList并输出到流中。

(我将使用FS2拉来跟踪状态递归,所以我不担心)。

但是我的问题是,在值的层面上,我没有办法知道元组的长度,也没有办法构造一个元组。

有什么建议让我解决这个问题吗?

感谢

让我们一步一步来做:

  • 您的输入将是A :+: B :+: C :+: CNil
  • 您将存储在某个地方:最新的A,最新的B
  • 最初不会有任何最新的值
  • 找到所有值后,你应该发出A :: B :: C :: HNil
  • 当你发出新的HList值时,你也应该重置你的中间值存储
  • 表明将这些中间值存储为Option[A] :: Option[B] :: Option[C] :: HNil
  • 将很方便

那么,让我们写一个类型类来帮助我们:

import shapeless._
// A type class for collecting Coproduct elements (last-wins)
// until they could be combined into an HList element
// Path-dependent types and Aux for better DX, e.g. when one
// would want Collector[MyType] without manually entering HLists
trait Collector[Input] {
type Cache
type Result
// pure computation of an updated cache
def updateState(newInput: Input, currentState: Cache): Cache
// returns Some if all elements of Cache are Some, None otherwise
def attemptConverting(updatedState: Cache): Option[Result]
// HLists of Nones
def emptyCache: Cache
}
object Collector {
type Aux[Input, Cache0, Result0] = Collector[Input] {
type Cache = Cache0
type Result = Result0
}
def apply[Input](implicit
collector: Collector[Input]
): Collector.Aux[Input, collector.Cache, collector.Result] =
collector
// obligatory empty Coproduct/HList case to terminate recursion
implicit val nilCollector: Collector.Aux[CNil, HNil, HNil] =
new Collector[CNil] {
type Cache = HNil
type Result = HNil
override def updateState(newInput: CNil, currentState: HNil): HNil = HNil
override def attemptConverting(updatedState: HNil): (Option[HNil]) =
Some(HNil)
override def emptyCache: HNil = HNil
}
// here we define the actual recursive derivation
implicit def consCollector[
Head,
InputTail <: Coproduct,
CacheTail <: HList,
ResultTail <: HList
](implicit
tailCollector: Collector.Aux[InputTail, CacheTail, ResultTail]
): Collector.Aux[
Head :+: InputTail,
Option[Head] :: CacheTail,
Head :: ResultTail
] = new Collector[Head :+: InputTail] {
type Cache = Option[Head] :: CacheTail
type Result = Head :: ResultTail
override def updateState(
newInput: Head :+: InputTail,
currentState: Option[Head] :: CacheTail
): Option[Head] :: CacheTail = newInput match {
case Inl(head) => Some(head) :: currentState.tail
case Inr(tail) =>
currentState.head :: tailCollector.updateState(
tail,
currentState.tail
)
}
override def attemptConverting(
updatedState: Option[Head] :: CacheTail
): Option[Head :: ResultTail] = for {
head <- updatedState.head
tail <- tailCollector.attemptConverting(updatedState.tail)
} yield head :: tail
override def emptyCache: Option[Head] :: CacheTail =
None :: tailCollector.emptyCache
}
}

这段代码没有假设如何存储缓存,也没有假设如何更新缓存。所以我们可以用一些不纯的代码来测试它:

import shapeless.ops.coproduct.Inject
type Input = String :+: Int :+: Double :+: CNil
val collector = Collector[Input]
// dirty, but good enough for demo
var cache = collector.emptyCache
LazyList[Input](
Inject[Input, String].apply("test1"),
Inject[Input, String].apply("test2"),
Inject[Input, String].apply("test3"),
Inject[Input, Int].apply(1),
Inject[Input, Int].apply(2),
Inject[Input, Int].apply(3),
Inject[Input, Double].apply(3),
Inject[Input, Double].apply(4),
Inject[Input, Double].apply(3),
Inject[Input, String].apply("test4"),
Inject[Input, Int].apply(4),
).foreach { input =>
val newCache = collector.updateState(input, cache)
collector.attemptConverting(newCache) match {
case Some(value) =>
println(s"Product computed: value!")
cache = collector.emptyCache
case None =>
cache = newCache
}
println(s"Current cache: $cache")
}

我们可以用Scaste检查它是否输出了我们所期望的结果。

Current cache: Some(test1) :: None :: None :: HNil
Current cache: Some(test2) :: None :: None :: HNil
Current cache: Some(test3) :: None :: None :: HNil
Current cache: Some(test3) :: Some(1) :: None :: HNil
Current cache: Some(test3) :: Some(2) :: None :: HNil
Current cache: Some(test3) :: Some(3) :: None :: HNil
Product computed: test3 :: 3 :: 3.0 :: HNil!
Current cache: None :: None :: None :: HNil
Current cache: None :: None :: Some(4.0) :: HNil
Current cache: None :: None :: Some(3.0) :: HNil
Current cache: Some(test4) :: None :: Some(3.0) :: HNil
Product computed: test4 :: 4 :: 3.0 :: HNil!
Current cache: None :: None :: None :: HNil

现在,问题是如何将这个中间结果通过FS2流线程化。一种方法是使用Ref

for {
// for easy passing of cache around
cacheRef <- Stream.eval(Ref[IO].of(collector.emptyCache))
// source of Coproducts
input <- Stream[IO, Input](
Inject[Input, String].apply("test1"),
Inject[Input, String].apply("test2"),
Inject[Input, String].apply("test3"),
Inject[Input, Int].apply(1),
Inject[Input, Int].apply(2),
Inject[Input, Int].apply(3),
Inject[Input, Double].apply(3)
)
updateCache = cacheRef.modify[Stream[IO, collector.Result]] { cache =>
val newCache = collector.updateState(input, cache)
collector.attemptConverting(newCache) match {
case Some(value) => collector.emptyCache -> Stream(value)
case None        => newCache -> Stream.empty
}
}
// emits new HList only if all of its elements has been gathered 
hlist <- Stream.eval(updateCache).flatten
} yield hlist

可以修改这段代码以符合他们的审美:将updateCache提取到某个函数中,使用state monad或其他任何东西。我想把它变成管道应该是,例如:

// you might replace cats.effect.IO with F[_]: Monad, use something
// else instead of Ref, or whatever
def collectCoproductsToHList[Input](
implicit collector: Collector[Input]
): IO[Pipe[IO, Input, collector.Result]] = 
Ref[IO].of(collector.emptyCache).map { cacheRef =>

val pipe: Pipe[IO, Input, collector.Result] = inputStream => for {
input <- inputStream
updateCache = cacheRef.modify[Stream[IO, collector.Result]] { cache =>
val newCache = collector.updateState(input, cache)
collector.attemptConverting(newCache) match {
case Some(value) => collector.emptyCache -> Stream(value)
case None        => newCache             -> Stream.empty
}
}
hlist <- Stream.eval(updateCache).flatten
} yield hlist

pipe
}

只是添加到@Mateusz Kubuszok惊人的答案,这就是我如何决定存储Collector缓存(fs2拉方式):

trait CollectorPipe[F[_], C <: Coproduct] {
type Out <: HList
def pipe: Pipe[F, C, Out]
}
object CollectorPipe {
type Aux[F[_], C <: Coproduct, Out0 <: HList] =
CollectorPipe[F, C] { type Out = Out0 }
def instance[F[_], C <: Coproduct, Out0 <: HList](tubo: Pipe[F, C, Out0]): Aux[F, C, Out0] =
new CollectorPipe[F, C] {
override type Out = Out0
override def pipe: Pipe[F, C, Out0] = tubo
}
implicit def make[
F[_],
C <: Coproduct
](implicit
collector: Collector[C]
): Aux[F, C, collector.Result] = instance { s1 =>
def go(s2: Stream[F, C], curr: collector.Cache): Pull[F, collector.Result, Unit] = {
s2.pull.uncons1.flatMap {
case Some((c, s3)) => {
val newState = collector.updateState(c, curr)
collector.attemptConverting(newState) match {
case Some(value) => Pull.output1(value) >> go(s3, collector.emptyCache)
case None        => go(s3, newState)
}
}
case None          => Pull.done
}
}
go(s1, collector.emptyCache).stream
}
implicit class CollectorPipeStreamOps[F[_], A <: Coproduct](private val s: Stream[F, A]) {
def pullAll(implicit ev: CollectorPipe[F, A]): Stream[F, ev.Out] = s.through(ev.pipe)
}
}

最新更新