Scala Future flatMap实现(链接)



尝试围绕异步任务如何链接在一起,为ex期货和flatMap

val a: Future[Int] = Future { 123 }
val b: Future[Int] = a.flatMap(a => Future { a + 321 })

我如何实现类似的东西,我通过等待a首先建立一个新的Future,并在结果上应用f。我试着寻找scala的源代码,但卡在这里:https://github.com/scala/scala/blob/2.13.x/src/library/scala/concurrent/Future.scala#L216

我会想象一些代码:

def myFlatMap(a: Future[Int])(f: a => Future[Int]): Future[Int] = {
Future {
// somehow wait for a to complete and the apply 'f'
a.onComplete {
case Success(x) => f(a)
}
}
}

但我猜上面会返回一个Future[Unit]。我只是想了解一下"模式"。异步任务是如何链接在一起的

(免责声明:我是Scala Futures的主要维护者)

你不会想要自己实现flatMap/recoverWith/transformWith——这是一个非常复杂的特性,难以安全地实现(堆栈安全、内存安全、并发安全)。

你可以看到我在说什么:

  • https://github.com/scala/scala/blob/2.13.x/src/library/scala/concurrent/impl/Promise.scala L442
  • https://github.com/scala/scala/blob/2.13.x/src/library/scala/concurrent/impl/Promise.scala L307
  • https://github.com/scala/scala/blob/2.13.x/src/library/scala/concurrent/impl/Promise.scala L274

这里有更多关于这个主题的阅读:https://viktorklang.com/blog/Futures-in-Scala-2.12-part-9.html

我发现查看scala2.12的源代码对我来说更容易理解Future/Promise是如何实现的,我了解到Future实际上是Promise,通过查看实现,我现在了解了如何"链接"。可以实现异步任务

我把我写的代码贴出来是为了帮助我更好地理解下面发生了什么。

import java.util.concurrent.{ScheduledThreadPoolExecutor, TimeUnit}
import scala.collection.mutable.ListBuffer
trait MyAsyncTask[T] {
type Callback = T => Unit
var result: Option[T]
// register a new callback to be called when task is completed.
def onComplete(callback: Callback): Unit
// complete the given task, and invoke all registered callbacks.
def complete(value: T): Unit
// create a new task, that will wait for the current task to complete and apply
// the provided map function, and complete the new task when completed.
def flatMap[B](f: T => MyAsyncTask[B]): MyAsyncTask[B] = {
val wrapper = new DefaultAsyncTask[B]()
onComplete { x =>
f(x).onComplete { y =>
wrapper.complete(y)
}
}
wrapper
}
def map[B](f: T => B): MyAsyncTask[B] = flatMap(x => CompletedAsyncTask(f(x)))
}
/**
* task with fixed pre-calculated result.
*/
case class CompletedAsyncTask[T](value: T) extends MyAsyncTask[T] {
override var result: Option[T] = Some(value)
override def onComplete(callback: Callback): Unit = {
// we already have the result just call the callback.
callback(value)
}
override def complete(value: T): Unit = () // noop nothing to complete.
}
class DefaultAsyncTask[T] extends MyAsyncTask[T] {
override var result: Option[T] = None
var isCompleted = false
var listeners = new ListBuffer[Callback]()
/**
* register callback, to be called when task is completed.
*/
override def onComplete(callback: Callback): Unit = {
if (isCompleted) {
// already completed just invoke callback
callback(result.get)
} else {
// add the listener
listeners.addOne(callback)
}
}
/**
* trigger all registered callbacks to `onComplete`
*/
override def complete(value: T): Unit = {
result = Some(value)
listeners.foreach { listener =>
listener(value)
}
}
}
object MyAsyncTask {
def apply[T](body: (T => Unit) => Unit): MyAsyncTask[T] = {
val task = new DefaultAsyncTask[T]()
// pass in `complete` as callback.
body(task.complete)
task
}
}
object MyAsyncFlatMap {
/**
* helper to simulate async task.
*/
def delayedCall(body: => Unit, delay: Int): Unit = {
val scheduler = new ScheduledThreadPoolExecutor(1)
val run = new Runnable {
override def run(): Unit = {
body
}
}
scheduler.schedule(run, delay, TimeUnit.SECONDS)
}
def main(args: Array[String]): Unit = {
val getAge = MyAsyncTask[Int] { cb =>
delayedCall({ cb(66) }, 1)
}
val getName = MyAsyncTask[String] { cb =>
delayedCall({ cb("John") }, 2)
}
// same as: getAge.flatMap(age => getName.map(name => (age, name)))
val result: MyAsyncTask[(Int, String)] = for {
age <- getAge
name <- getName
} yield (age, name)
result.onComplete {
case (age, name) => println(s"hello $name $age")
}
}
}

最新更新