从可能产生错误情况的 Akka 流准备正确的 HTTP 响应



我打算使用 Akka Streams 对一个琐碎的游戏玩法(HTTPReq/HTTPResp(进行建模。在一个回合中,玩家被挑战由服务器猜测一个数字。服务器检查玩家的响应,如果服务器持有的内容和玩家猜测的内容相同,则给玩家一个分数。

典型的流程是这样的:

  • 玩家(已通过身份验证,会话 ID 已分配(请求开始一轮
  • 服务器检查会话 ID 是否有效;如果无效,则通知玩家一条合适的消息
  • 服务器生成一个号码并向玩家提供,以及一个 RoundID

。等等。没什么了不起的。

这是类型和流程的粗略排列:

import akka.{Done, NotUsed}
import akka.stream.scaladsl.{Flow, Keep, Source}
import java.util.Random
import akka.stream.scaladsl.Sink
import scala.concurrent.Future
import scala.util.{Failure, Success}
sealed trait GuessingGameMessageToAndFro
case class StartARound(sessionID: String) extends GuessingGameMessageToAndFro
case class RoundStarted(sessionID: String, roundID: Int) extends GuessingGameMessageToAndFro
case class NumberGuessed(sessionID: String, roundID: Int, guessedNo: Int) extends GuessingGameMessageToAndFro
case class CorrectNumberGuessed(sessionID: String, nextRoundID: Int) extends GuessingGameMessageToAndFro
case class FinalRoundScore(sessionID: String, finalScore: Int) extends GuessingGameMessageToAndFro
case class MissingSession(sessionID: String) extends GuessingGameMessageToAndFro
case class IncorrectNumberGuessed(sessionID: String, clientGuessed: Int, serverChose: Int) extends GuessingGameMessageToAndFro
object SessionService {
  def exists(m: StartARound) = if (m.sessionID.startsWith("1")) m else MissingSession(m.sessionID)
}
object NumberGenerator {
  def numberToOfferToPlayer(m: GuessingGameMessageToAndFro) = {
    m match {
      case StartARound(s)     =>  RoundStarted(s, new Random().nextInt())
      case MissingSession(s)  =>  m
      case _                  =>  throw new RuntimeException("Not yet implemented")
    }
  }
}
val sessionExistenceChecker: Flow[StartARound,GuessingGameMessageToAndFro,NotUsed]
      = Flow.fromFunction(m => SessionService.exists(m))
val guessNumberPreparator: Flow[GuessingGameMessageToAndFro,GuessingGameMessageToAndFro,_]
      = Flow.fromFunction(m => NumberGenerator.numberToOfferToPlayer(m))
val s1 = StartARound("123")
val k =
  Source
    .single(s1)
    .via(sessionExistenceChecker)
    .via(guessNumberPreparator)
    .toMat(Sink.head)(Keep.right)
val finallyObtained = k.run
finallyObtained.onComplete(v => {
  v match {
    case Success(x)    => //   Prepare proper HTTP Response
    case Failure(ex)   => //   Prepare proper HTTP Response
  }
})

我在 numberToOfferToPlayer(( 中经历一个长模式匹配块的原因是(我在这里显示了 2,但显然它的大小会随着每种可以流动的类型而增加(是因为如果像 sessionExistChecker 这样的运算符生成一个 MissingSession(这是一个错误条件(,它必须通过流的其余部分,保持不变直到它到达未来[完成]阶段。事实上,问题更普遍:在任何阶段,适当的转换都应该导致可接受的类型或错误类型(互斥(。如果我遵循这种方法,模式匹配块将激增,代价是不必要的重复,如果不是丑陋的话。

我对我的这个解决方案感到不舒服。它正变得啰嗦和笨拙。

不用说,我还没有在这里展示面向 Akka-HTTP 的部分(包括路由(。上面的代码可以很容易地用路由处理程序缝合。所以,我跳过了它。

我的问题是:对于这样的流,什么是正确的成语?从概念上讲,如果一切正常,元素应该沿着流继续移动。但是,每当发生错误时,(错误(元素应直接跳到最后阶段,跳过其间的所有其他阶段。对此进行建模的公认方法是什么?

我已经浏览了许多 Stackoverflow 帖子,这些帖子表明,对于类似情况,应该采用分区/合并方式。我理解我如何采用这种方法,但对于像我这样的简单案例,这似乎是不必要的工作。或者,我在这里完全偏离了目标吗?

指关节上的任何提示,片段或说唱,将不胜感激。

使用 Partial Function

对于这个特定的用例,我通常同意分区和合并设置是"不必要的工作"。 问题中提到的其他堆栈帖子适用于您只有Flow值要组合而无法操作 Flow 中的底层逻辑的用例。

当您能够修改底层逻辑时,存在更简单的解决方案。 但解决方案并不严格地存在于 akka 的领域内。 相反,你可以利用 scala 本身中可用的函数式编程结构。

如果将numberTofferToPlayer函数重写为PartialFunction

object NumberGenerator {
  val numberToOfferToPlayer : PartialFunction[GuessingGameMessageToAndFro, GuessingGameMessageToAndFro] = {
    case s : StartARound  =>  RoundStarted(s.sessionID, new Random().nextInt())
  }
}

然后,可以将此PartialFunction提升为常规函数,如果消息为StartARound类型,则将应用逻辑,或者如果消息是任何其他类型,则仅转发消息。

这种提升是通过 PartialFunction 的applyOrElse方法与 scala 中预定义的 identity 函数一起完成的,该函数将输入作为输出返回(即"转发"输入(:

import NumberGenerator.numberToOfferToPlayer
val messageForwarder : GuessingGameMessageToAndFro => GuessingGameMessageToAndFro = 
  identity[GuessingGameMessageToAndFro]
val guessNumberPreparator: Flow[GuessingGameMessageToAndFro,GuessingGameMessageToAndFro,_] =
  Flow fromFunction (numberToOfferToPlayer applyOrElse (_, messageForwarder))

更高级别的抽象

如果您有以下几个要添加转发逻辑的分部函数:

val foo : PartialFunction[GuessingGameMessageToAndFro, GuessingGameMessageToAndFro] = {
  case r : RoundStarted => ???
}
val bar : PartialFunction[GuessingGameMessageToAndFro, GuessingGameMessageToAndFro] = {
  case n : NumberGuessed => ???
}

然后你可以写一个通用的提升器,它将抽象出常规函数的创建:

val applyOrForward : PartialFunction[GuessingGameMessageToAndFro, GuessingGameMessageToAndFro] => GuessingGameMessageToAndFro => GuessingGameMessageToAndFro =
  ((_ : PartialFunction[Int, Int]) applyOrElse ((_ : GuessingGameMessageToAndFro), messageForwader).curried

这个升降机将很好地清理你的代码:

val offerFlow = Flow fromFunction applyOrForward(numberToOfferToPlayer)
val fooFlow = Flow fromFunction applyOrForward(foo)
val barFlow = Flow fromFunction applyOrForward(bar)

然后,可以按照问题描述的方式组合这些流:

val combinedFlow = offerFlow via fooFlow via barFlow

同样,您可以通过先组合 PartFunctions,然后从组合中创建单个流来获得相同的结果。 这对于 akka 之外的单元测试很有用:

val combinedPartial = numberToOfferToPlayer orElse foo orElse bar
//no akka test kit necessary
assert {
  val testError = MissingSession("testId")
  applyOrForward(combinedPartial)(testError) equals testError
}     
//nothing much to test
val otherCombinedFlow = Flow fromFunction applyOrForward(combinedPartial)

最新更新