对 Scala 未来的不一致行为有问题



我面临着一个小问题,Scala的Future的行为不一致。下面是我有的代码

package TryFuture
import scala.concurrent.Future
import scala.concurrent.future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.util.Random
class FutureCappuccino {
  // Some type aliases, just for getting more meaningful method signatures:
  type CoffeeBeans = String
  type GroundCoffee = String
  case class Water(temperature: Int)
  type Milk = String
  type FrothedMilk = String
  type Espresso = String
  type Cappuccino = String
  // some exceptions for things that might go wrong in the individual steps
  // (we'll need some of them later, use the others when experimenting
  // with the code):
  case class GrindingException(msg: String) extends Exception(msg)
  case class FrothingException(msg: String) extends Exception(msg)
  case class WaterBoilingException(msg: String) extends Exception(msg)
  case class BrewingException(msg: String) extends Exception(msg)
  def grind(beans: CoffeeBeans): Future[GroundCoffee] = future {
    println("Start grinding with thread: " + Thread.currentThread().getId())
    //Thread.sleep(10)
    if (beans == "baked beans") throw GrindingException("are you joking?")
    println("Finished grinding..")
    s"ground coffee of $beans"
  }
  def heatWater(water: Water): Future[Water] = future {
    println("Heating the water with thread: " + Thread.currentThread().getId())
    //Thread.sleep(10)
    println("It's hot!!")
    water.copy(temperature = 85)
  }
  def frothMilk(milk: Milk): Future[FrothedMilk] = future {
    println("milk frothing system engaged! with thread: " + Thread.currentThread().getId())
    //Thread.sleep(Random.nextInt(10))
    println("shutting down milk frothing system")
    s"frothed $milk"
  }
  def brew(coffee: GroundCoffee, heatedWater: Water): Future[Espresso] = future {
    println("happy brewing :) with thread: " + Thread.currentThread().getId())
    //Thread.sleep(Random.nextInt(10))
    println("it's brewed!")
    s"espresso"
  }
  def combine(espresso: Espresso, frothedMilk: FrothedMilk): Future[String] = future {
    println("Combine starting with thread: " + Thread.currentThread().getId())
    "Your Cappuccino is ready"
  }
  // going through these steps sequentially:
  def prepareCappuccino() = {
    val brewed = for {
      coffee <- grind("not baked beans")
      water <- heatWater(Water(25))
      brewed <- brew(coffee, water)
    } yield brewed
    brewed
  }
}

现在我所期待的是,水会等待研磨完成,然后酿造会等待研磨和加热水完成。但是当我在我的MacBook中运行这个程序时,我只得到下面的o/p用螺纹开始研磨:8,然后执行完成。它不是在等待剩下的东西。找不到我在这里缺少的东西,为什么它不在等待?有什么帮助吗?

def grind(beans: CoffeeBeans): Future[GroundCoffee] = future {
    println("Start grinding with thread: " + Thread.currentThread().getId())
    if (beans == "baked beans") throw GrindingException("are you joking?")
    ...
}
def prepareCappuccino() = {
  val brewed = for {
    coffee <- grind("baked beans")
    water <- heatWater(Water(25))
    brewed <- brew(coffee, water)
  } yield brewed
  brewed
}

grind方法正在引发异常,导致for表达式导致失败将来

grind被传递为"未烘焙的豆子"时,它会按预期工作。

你的主线程退出得太早了。

使用Thread.sleep或Await.result

最终通过以下更改明确了概念:

package TryFuture
import scala.concurrent.Future
import scala.concurrent.future
import scala.util.{Success,Failure}
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration._
import scala.util.Random
class FutureCappuccino {
  // Some type aliases, just for getting more meaningful method signatures:
  type CoffeeBeans = String
  type GroundCoffee = String
  case class Water(temperature: Int)
  type Milk = String
  type FrothedMilk = String
  type Espresso = String
  type Cappuccino = String
  // some exceptions for things that might go wrong in the individual steps
  // (we'll need some of them later, use the others when experimenting
  // with the code):
  case class GrindingException(msg: String) extends Exception(msg)
  case class FrothingException(msg: String) extends Exception(msg)
  case class WaterBoilingException(msg: String) extends Exception(msg)
  case class BrewingException(msg: String) extends Exception(msg)
  def grind(beans: CoffeeBeans): Future[GroundCoffee] = {
    println("Start grinding with thread: " + Thread.currentThread().getId())
    Thread.sleep(200)
    if (beans == "baked beans") throw GrindingException("are you joking?")
    future {
      Thread.sleep(200)
      s"ground coffee of $beans"
    }
  }
  def heatWater(water: Water): Future[Water] = {
    println("Heating the water with thread: " + Thread.currentThread().getId())
    Thread.sleep(200)
    future {
      water.copy(temperature = 85)
    }
  }
  def frothMilk(milk: Milk): Future[FrothedMilk] = {
    println("milk frothing system engaged! with thread: " + Thread.currentThread().getId())
    Thread.sleep(200)
    future {
      s"frothed $milk"
    }
  }
  def brew(coffee: GroundCoffee, heatedWater: Water): Future[Espresso] = {
    println("happy brewing :) with thread: " + Thread.currentThread().getId())
    Thread.sleep(200)
    future {
      s"espresso"
    }
  }
  def combine(espresso: Espresso, frothedMilk: FrothedMilk): Future[String] =  {
    println("Combine starting with thread: " + Thread.currentThread().getId())
    Thread.sleep(200)
    future {
      Thread.sleep(20)
      "Your Cappuccino is ready"
    }
  }
  // going through these steps sequentially:
  def prepareCappuccino() = {
    val coffees = grind("not baked beans")
    val waters = heatWater(Water(25))
    val milks = frothMilk("milk")
    val combined = for {
      coffee <- coffees
      water <- waters
      brewed <- brew(coffee, water)
      milk <- milks
      combined <- combine(brewed, milk)
    } yield combined
    combined onComplete {
      case Success(t)   => println("combined is done")
      case Failure(t)   => t
    }
    coffees onComplete {
      case Success(t)   => println("Coffee is done")
      case Failure(t)   => t
    }
    combined
  }
}

最后======>

val myFutureCappuccino = new FutureCappuccino()
  val myCoffee = myFutureCappuccino.prepareCappuccino
  Thread.sleep(2000)
  myCoffee onComplete{
    case Success(t) =>  println(t)
    case Failure(p) =>  println(p.getMessage())
  }

现在对输出感到满意:

Start grinding with thread: 1
Heating the water with thread: 1
milk frothing system engaged! with thread: 1
happy brewing :) with thread: 8
Coffee is done
Combine starting with thread: 8
combined is done
Your Cappuccino is ready

在这里分享答案,希望它能帮助到别人。谢谢

最新更新