使用Doobie中资源内的事务处理程序返回实体流



我正在尝试实现一个查询,该查询返回fs2.Stream中提取的信息。我定义了Jobs代数:

trait Jobs[F[_]] {
def all(): fs2.Stream[F, Job]
}

然后,我实现了代数的解释器:

final class LiveJobs[F[_]: MonadCancelThrow](postgres: Resource[F, Transactor[F]]) extends Jobs[F] {
override def all(): fs2.Stream[F, Job] = for {
jobs <- postgres.use { xa =>
sql"SELECT * FROM jobs".query[Job].stream.transact(xa)
}
} yield jobs
}

然而,由于类型没有对齐,编译器会大喊:

type mismatch;
[error]  found   : fs2.Stream[[_]F[_],Job]
[error]  required: F[?]
[error]       sql"SELECT * FROM jobs".query[Job].stream.transact(xa)
[error]                                                         ^
[error] one error found

Resource.use方法需要一个生成F[*]而不是fs2.Stream[F, Job]的函数。我找不到任何可以在这两种类型之间转换的内容,也找不到使用postgres资源的不同方式。

以下可能是您想要遵循的设计:

trait Jobs[F[_]] {
def all: fs2.Stream[F, Job] =
}
object Jobs {
// I am not exactly sure which typeclass you require here, so i will use Async
def live[F[_]](implicit ev: Async[F]): Resource[F, Jobs[F]] = {
val transactor: Resource[F, Transactor[F]] = ... // Whatever you already have here.
transactor.map(xa => new LiveJobs(xa))
}
}
private[pckg] final class LiveJobs[F[_]](xa: Transactor[F])(implicit ev: MonadCancelThrow[F]) extends Jobs[F] {
override final val all: fs2.Stream[F, Job] =
sql"SELECT * FROM jobs".query[Job].stream.transact(xa)
}

另外,我个人的建议是,在学习的同时坚持具体的IO;甚至可能在之后。整个F[_]事件只会造成比最初更大的麻烦。

最新更新