用猫创建抽象集合的有效方式



我有一些代码使用MonixObservable对文件进行流处理。为了测试这段代码,我希望我在Observable上所做的操作与类型无关,这样我也可以在任何其他数据结构(如List(上执行它们。这就是为什么我编写了以下代码来抽象底层数据结构:

def permutations[F[_] : Applicative : FunctorFilter : SemigroupK](chars: F[Char]): F[F[Char]] = {
Range.inclusive('a', 'z').map(_.toChar)
.map { c ⇒
FunctorFilter[F].filter(chars)(Character.toLowerCase _ andThen (_ != c))
}
.map(Applicative[F].pure)
.reduceLeft(SemigroupK[F].combineK)
}

让我感到困扰的是,这段代码创建了很多中间数据结构。有没有一个类型类可以让这个过程更高效?可以在没有太多开销的情况下将一个数据结构提升到另一个结构中的东西,比如LiftIO,但用于项的集合?

看起来猫对此没有任何贡献。monix也好不到哪里去,它只实现了少数来自cat的类型类。

所以,我最好的猜测是自己定义这样的类型类:

import monix.execution.Scheduler.Implicits.global
import cats._
import cats.implicits._
import monix.reactive._
object Test {
def main(args: Array[String]): Unit = {
println(permutations(List('a', 'b', 'c')))
permutations(Observable('a', 'b', 'c')).foreach{c =>
print("Observable(")
c.foreach(c1 => print(c1 + " "))
print(") ")
}
}
def permutations[F[_] : Applicative](chars: F[Char])(implicit seq: Sequence[F], fil: Filter[F]): F[F[Char]] = {
val abc = seq.fromIterable(
Range.inclusive('a', 'z').map(_.toChar)
)
abc.map(c => fil.filter(chars)(_ != c))
}
trait Sequence[F[_]] {
def fromIterable[A](f: Iterable[A]): F[A]
}
implicit val listSequence: Sequence[List] = new Sequence[List] {
def fromIterable[A](f: Iterable[A]): List[A] = f.toList
}
implicit val observableSequence: Sequence[Observable] = new Sequence[Observable] {
def fromIterable[A](f: Iterable[A]): Observable[A] = Observable.fromIterable(f)
}
trait Filter[F[_]] {
def filter[A](fa: F[A])(f: A => Boolean): F[A]
}
implicit val observableFilterFunctor: Filter[Observable] = new Filter[Observable] {
def filter[A](fa: Observable[A])(f: A => Boolean): Observable[A] =
fa.filter(f)
}
implicit val listFilterFunctor: Filter[List] = new Filter[List] {
def filter[A](fa: List[A])(f: A => Boolean): List[A] =
fa.filter(f)
}
}

结果:

List(List(b, c), List(a, c), List(a, b), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c), List(a, b, c))
Observable(b c ) Observable(a c ) Observable(a b ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) Observable(a b c ) 

遗憾的是,我无法在scalafiddle或scastie上实现这一点,因为两者都没有提供正确的cat(1.5.0(和monix(3.0.0-M3(版本。

我仍然希望这能有所帮助。

尽管创建可重复使用的函数很有用,但您可以轻松地测试Observable,而无需执行此操作。

我建议将逻辑处理和副作用消费者分开

object StreamProcessing {
def processItems(obs: Observable[Input]): Observable[Result] = ???
}

在prod中,你会做

val eventsStream: Observable[Input] = ???
val eventsConsumer: Consumer[Input, Output] = ???
StreamProcessing(myEventsStream).consumeWith(eventsConsumer)

然后,在测试中,您可以模拟测试数据,断言列表结果。此外,通过测试Observable,您可以使用TestScheduler控制时间,这使得测试变得轻而易举。

implicit val sc = TestScheduler()
val testData: List[Input] = ???
val expected: List[Output] = ???
val res = StreamProcessing(Observable.fromIterable(testData))
.toListL
.runToFuture
sc.tick()
assert(res.value, Some(Success(expected))

相关内容

  • 没有找到相关文章

最新更新