Scala中的嵌套过滤器在Future[Try[Int]]之上



我是scala的初学者,我正在尝试在Either之上实现一个过滤器逻辑。现在我有一个函数getTaskId,它返回Future[Try[Int]],而我的filter logic是基于该Int的。现在,由于filter期望boolean,我无法在下面的代码片段中返回相同的内容。

val records: List[CommittableRecord[Either[Throwable, MyEvent]]] = ???
records.filter { 
(x: CommittableRecord[Either[Throwable,MyEvent]]) => 
x.value match {
case Right(event: MyEvent) =>
getTaskId(event.get("task").get) filter {
case Success(value)     => value > 1
case Failure(exception) => false
}
case Left(_) => false
}
}

我在返回Future[Try[Int]]的函数getTaskId上得到filter的错误

type mismatch;
found   : scala.concurrent.Future[scala.util.Try[Int]]
required: Boolean

所以基本上filterFuture之上返回另一个Future,但是父filter期望boolean

非常感谢您的帮助。

您在scala中面临两个困难的特性:

  1. 很多句法糖
  2. scala中的最佳实践不是等待Future的结果在您的一些业务逻辑中使用:Await.result(future, timeout)。您应该只在宇宙的尽头使用它(在大多数情况下:在程序的末尾(

因此,我建议重构您当前的逻辑,从在结果中过滤List[CommittableRecord],使结果是非阻塞的-具有过滤记录列表的Future[List[CommittableRecord]]。您可以处理这个未来,就像它只是另一个数据容器(如Option[T](一样,并在程序调用阻塞操作结束时使用Await.result

代码示例:

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future.sequence
import scala.util.{Failure, Success, Try}
case class Task()
type MyEvent = Map[String, Task]
case class CommittableRecord(value: Either[Throwable, MyEvent])
def getTaskId(task: Task): Future[Try[Int]] = ???
val records: List[CommittableRecord] = List.empty[CommittableRecord]
val result: Future[List[CommittableRecord]] = sequence(
records.map(
x =>
(x.value match {
case Left(_) => Future(false)
case Right(value) =>
getTaskId(value.get("task").get)
.map {
case Failure(_) => false
case Success(id) => id > 1
}
}).map(_ -> x)
)
).map(
idMoreThen1AndRecordList =>
idMoreThen1AndRecordList.collect {
case (true, record) => record
}
)

或者,经过一些重构并将lambda表达式替换为函数后:

def isTaskIdMoreThenOneAndRecord(record: CommittableRecord): Future[(Boolean, CommittableRecord)] =
(record.value match {
case Left(_) => Future(false)
case Right(value) =>
getTaskId(value.get("task").get)
.map(tryId => tryId.fold(_ => false, _ > 1))
}).map(_ -> record)
def filterRecordsWithTaskIdMoreThenOne(
isMoreOneAndRecordList: List[(Boolean, CommittableRecord)]
): List[CommittableRecord] =
isMoreOneAndRecordList.collect {
case (true, record) => record
}
val result: Future[List[CommittableRecord]] =
sequence(records.map(isTaskIdMoreThenOneAndRecord))
.map(filterRecordsWithTaskIdMoreThenOne)

因此,结果是您将获得Future[List[CommittableRecord]],并且您可以使用map函数处理过滤后的记录:

result.map((filteredRecords: List[CommittableRecord]) => *do something with filtered records*)

或者您可以使用flatMap组合两个非阻塞操作(例如您的列表和另一个非阻塞函数(。

有用的链接:

  • 阅读更多关于未来的scala文档
  • 关于lambda表达式的文章
  • scala并发中的一些最佳实践

最新更新