使用Akka流时,我似乎永远无法正确处理错误。
所以这是我的代码
var db = Database.forConfig("oracle")
var mysqlDb = Database.forConfig("mysql_read")
var mysqlDbWrite = Database.forConfig("mysql_write")
implicit val actorSystem = ActorSystem()
val decider : Supervision.Decider = {
case _: Exception =>
println("got an exception restarting connections")
// let us restart our connections
db.close()
mysqlDb.close()
mysqlDbWrite.close()
db = Database.forConfig("oracle")
mysqlDb = Database.forConfig("mysql_read")
mysqlDbWrite = Database.forConfig("mysql_write")
Supervision.Restart
}
implicit val materializer = ActorMaterializer(ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider))
我有这样的流程
val alreadyExistsFilter : Flow[Foo, Foo, NotUsed] = Flow[Foo].mapAsync(10){ foo =>
try {
val existsQuery = sql"""SELECT id FROM foo WHERE id = ${foo.id}""".as[Long]
mysqlDbWrite.run(existsQuery).map(v => (foo, v))
} catch {
case e: Throwable =>
println(s"Lookup failed for ${foo}")
throw e // will restart the stream
}
}.collect {case (f, v) if v.isEmpty => f}
基本上,如果foo已经存在于mysql中,则不应通过流进行记录。
我对此代码的希望是,如果MySQL查找失败(MySQL机器非常糟糕,超时很常见),则记录将被打印和丢弃,并且该流将继续由剩下的记录进行监督。。
运行此代码时。我看到
之类的错误[error] (mysql_write network timeout executor) java.lang.RuntimeException: java.sql.SQLException: Invalid socket timeout value or state
java.lang.RuntimeException: java.sql.SQLException: Invalid socket timeout value or state
at com.mysql.jdbc.ConnectionImpl$12.run(ConnectionImpl.java:5576)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.sql.SQLException: Invalid socket timeout value or state
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:998)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:937)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:926)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:872)
at com.mysql.jdbc.MysqlIO.setSocketTimeout(MysqlIO.java:4852)
at com.mysql.jdbc.ConnectionImpl$12.run(ConnectionImpl.java:5574)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.net.SocketException: Socket is closed
at java.net.Socket.setSoTimeout(Socket.java:1137)
at com.mysql.jdbc.MysqlIO.setSocketTimeout(MysqlIO.java:4850)
at com.mysql.jdbc.ConnectionImpl$12.run(ConnectionImpl.java:5574)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
和
[error] (mysql_write network timeout executor) java.lang.NullPointerException
java.lang.NullPointerException
at com.mysql.jdbc.MysqlIO.setSocketTimeout(MysqlIO.java:4850)
at com.mysql.jdbc.ConnectionImpl$12.run(ConnectionImpl.java:5574)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
在这里令我感到惊讶的一件事是,这些例外不是来自我的捕获块。因为我看不到我的捕获块的println语句。堆栈跟踪不会向我显示它起源于...,但是由于它说 mysql_write
,所以我可以假设它的流程上方是因为只有此流程使用 mysql_write
。
最后,整个流都带有错误
[trace] Stack trace suppressed: run last compile:runMain for the full output.
flow has failed with error Shutting down because of violation of the Reactive Streams specification.
14:51:06,973 |-INFO in ch.qos.logback.classic.AsyncAppender[asyncKafkaAppender] - Worker thread will flush remaining events before exiting.
[success] Total time: 3480 s, completed Sep 26, 2017 2:51:07 PM
14:51:07,603 |-INFO in ch.qos.logback.core.hook.DelayingShutdownHook@2320545b - Sleeping for 1 seconds
我不知道我为违反反应流的规范做了什么!
获得更可预测的解决方案的第一个刺伤是删除阻塞行为(Await.result
)并使用mapAsync
。alreadyExistsFilter
流的重写可能是:
val alreadyExistsFilter : Flow[Foo, Foo, NotUsed] = Flow[Foo].mapAsync(3) { foo ⇒
val existsQuery = sql"""SELECT id FROM foo WHERE id = ${foo.id}""".as[Long]
foo → Await.result(mysqlDbWrite.run(existsQuery), Duration.Inf)
}.collect{
case (foo, res) if res.isDefined ⇒ foo
}
可以在文档中找到有关阻止Akka的更多信息。
Stefano给出的答案是正确的。由于流程中的阻止代码,错误确实发生了。
,尽管我的初始程序是针对Scala 2.11的2.11,甚至在切换到MapAsync之后,问题仍然存在。
由于这是一个命令行工具,因此我很容易切换到Scala 2.12,然后重试。
当我尝试使用Scala 2.12时,它可以很好地工作。
对我有很大帮助的一件事是在依赖项中拥有"ch.qos.logback" % "logback-classic" % "1.2.3",
。这将向您显示正在执行的每个SQL语句,并轻松查看是否出现了问题。