如何以惯用的方式将错误日志添加到我的Akka流中



我目前正在运行类似于以下内容的Akka流设置:

                 ┌───────────────┐                 
┌─────────────┐  │┌─────────────┐│                 
│REST endpoint│──▶│Queue source ││                 
└─────────────┘  │└──────╷──────┘│                 
                 │┌──────▼──────┐│                 
                 ││   Flow[T]   ││                 
                 │└──────╷──────┘│                 
                 │┌──────▼──────┐│  ┌─────────────┐
                 ││  KafkaSink  │├─▶│ Kafka topic │
                 │└─────────────┘│  └─────────────┘
                 └───────────────┘                 

虽然这项工作很好,但我想对生产系统有一些了解,即是否存在错误以及什么样的错误。例如,我将KafkaSink封装为RestartSink.withBackoff,并将以下属性应用于封装的接收器:

private val decider: Supervision.Decider = {
  case x =>
    log.error("KafkaSink encountered an error and will stop", x)
    Supervision.Stop
}
Flow[...]
  .log("KafkaSink")
  .to(Producer.plainSink(...))
  .withAttributes(ActorAttributes.supervisionStrategy(decider))
  .addAttributes(
    ActorAttributes.logLevels(
      onElement = Logging.DebugLevel,
      onFinish = Logging.WarningLevel,
      onFailure = Logging.ErrorLevel
    )
  )

这确实为我提供了一些见解,例如,我将获得一条下游已关闭的日志消息,以及通过我添加的supervisionStrategy出现的异常。

然而,这种解决方案感觉有点像一种变通方法(例如,将异常记录为监督策略(,而且它也没有对RestartWithBackoffSink的行为提供任何见解。当然,我可以为该类启用DEBUG级别的日志记录,但同样,这感觉像是在生产中要做的一个变通方法。

长话短说:

  • 我试图深入了解Akka流中发生的错误的方式有任何明显的缺点吗
  • 有没有更好/更惯用的方法在生产中向Akka流添加日志记录

我想你差不多到了!!

实际上,这是文档中描述的方式。使用log()方法可以更细粒度地控制流中元素的日志记录级别、流的完成和失败。尽管如此,我不喜欢在主管策略中添加日志消息。如果您确实想显示特定的异常,那么,创建一个自定义异常,在主管策略中捕获它,并让Akka为您记录该消息。您可以在Akka流配置中启用debug-logging,默认情况下,off用于DEBUG日志级别的其他故障排除日志记录。除此之外,您还可以在actor级别启用日志记录。(请参阅本文档(。

我认为在生产中,可能有两种方法来记录错误:

1( 在恢复阶段记录或重新引发异常。通过这种方式,所有来自上游的异常都将被捕获并记录:

object AkkaStreamRecap extends App {
  implicit val system = ActorSystem("AkkaStreamsRecap")
  implicit val materialiser = ActorMaterializer()
  import system.dispatcher
  val source = Source(-5 to 5) 
  val sink = Sink.foreach[Int](println)
  val flow = Flow[Int].map(x => 1 / x)
  val runnableGraph = source.
    via(flow).
    recover {
      case e => throw e
    }.
    to(sink)
  runnableGraph.run()
}

输出:

0
0
0
0
-1
[ERROR] [03/06/2020 16:27:58.703] [AkkaStreamsRecap-akka.actor.default-dispatcher-2] [akka://AkkaStreamsRecap/system/StreamSupervisor-0/flow-0-0-ignoreSink] Error in stage [Recover(<function1>)]: / by zero
java.lang.ArithmeticException: / by zero
    at com.personal.akka.http.chapter1.AkkaStreamRecap$.$anonfun$flow$1(AkkaStreamRecap.scala:41)
    at scala.runtime.java8.JFunction1$mcII$sp.apply(JFunction1$mcII$sp.java:23)
    at akka.stream.impl.fusing.Map$$anon$1.onPush(Ops.scala:54)
    at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:523)
    at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:480)
    at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:376)
    at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:606)
    at akka.stream.impl.fusing.GraphInterpreterShell.init(ActorGraphInterpreter.scala:576)
    at akka.stream.impl.fusing.ActorGraphInterpreter.tryInit(ActorGraphInterpreter.scala:682)
    at akka.stream.impl.fusing.ActorGraphInterpreter.preStart(ActorGraphInterpreter.scala:731)
    at akka.actor.Actor.aroundPreStart(Actor.scala:550)
    at akka.actor.Actor.aroundPreStart$(Actor.scala:550)
    at akka.stream.impl.fusing.ActorGraphInterpreter.aroundPreStart(ActorGraphInterpreter.scala:671)
    at akka.actor.ActorCell.create(ActorCell.scala:676)
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:547)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:569)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:293)
    at akka.dispatch.Mailbox.run(Mailbox.scala:228)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:241)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

2( 定义自定义监督策略,并在流属性或物化器设置中使用:

object AkkaStreamRecap extends App {
  implicit val system = ActorSystem("AkkaStreamsRecap")
  private val decider: Supervision.Decider = {
    case e: ArithmeticException =>
      println("Arithmetic exception: Divide by Zero")
      Supervision.Stop
  }
  implicit val materialiser = ActorMaterializer(ActorMaterializerSettings(system).withSupervisionStrategy(decider))
  import system.dispatcher

  val source = Source(-5 to 5)
  val sink = Sink.foreach[Int](println)
  val flow = Flow[Int].map(x => 1 / x)
  val runnableGraph = source.via(flow).log("divide by zero").to(sink)
  runnableGraph.run()
}

输出:

0
0
0
0
-1
Arithmetic exception: Divide by Zero
[ERROR] [03/06/2020 16:37:00.740] [AkkaStreamsRecap-akka.actor.default-dispatcher-2] [akka.stream.Log(akka://AkkaStreamsRecap/system/StreamSupervisor-0)] [divide by zero] Upstream failed.
java.lang.ArithmeticException: / by zero
    at com.personal.akka.http.chapter1.AkkaStreamRecap$.$anonfun$flow$1(AkkaStreamRecap.scala:26)
    at scala.runtime.java8.JFunction1$mcII$sp.apply(JFunction1$mcII$sp.java:23)
    at akka.stream.impl.fusing.Map$$anon$1.onPush(Ops.scala:54)
    at akka.stream.impl.fusing.GraphInterpreter.processPush(GraphInterpreter.scala:523)
    at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:480)
    at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:376)
    at akka.stream.impl.fusing.GraphInterpreterShell.runBatch(ActorGraphInterpreter.scala:606)
    at akka.stream.impl.fusing.GraphInterpreterShell.init(ActorGraphInterpreter.scala:576)
    at akka.stream.impl.fusing.ActorGraphInterpreter.tryInit(ActorGraphInterpreter.scala:682)
    at akka.stream.impl.fusing.ActorGraphInterpreter.preStart(ActorGraphInterpreter.scala:731)
    at akka.actor.Actor.aroundPreStart(Actor.scala:550)
    at akka.actor.Actor.aroundPreStart$(Actor.scala:550)
    at akka.stream.impl.fusing.ActorGraphInterpreter.aroundPreStart(ActorGraphInterpreter.scala:671)
    at akka.actor.ActorCell.create(ActorCell.scala:676)
    at akka.actor.ActorCell.invokeAll$1(ActorCell.scala:547)
    at akka.actor.ActorCell.systemInvoke(ActorCell.scala:569)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:293)
    at akka.dispatch.Mailbox.run(Mailbox.scala:228)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:241)
    at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

如果有帮助,请告诉我!!

附言……我在官方文件中找不到任何其他记录错误的来源或方式。

最新更新