是否可以使用AkkaHttp,AkkaStreams,Alpakka和数据库构建OLTP / CRUD HTTP服务器?



我很清楚,使用Actor当然是可能的:例如 https://github.com/chbatey/akka-http-typed.git 使用的是AkkaHttp和类型化的actor。

但是我不清楚是否只使用 AkkaStreams 及其 Alpakka 连接器库(包括数据库(,是否可以执行常规的 CRUD/OLTP 服务,或者只是从一个数据库到另一个数据库的数据复制,或其他 OLAP/批处理/流处理方案。

如果您知道如何完成,请指出一些细节,例如,如果您可以在 github 上提供一个示例,那就太好了。

我认为可能的方式是服务器参与两个对话/有状态流转换:一个通过HTTP与外界,另一个与数据库。我不确定这是否可以像这样建模。

https://doc.akka.io/docs/alpakka/current/slick.html 似乎同时提供更新/插入作为接收器,并将 SELECT 指向某个 id 作为源。您是否知道是否有示例应用程序,或者您可以广泛提及 Akka Http 的接线方式吗?

我在这里放了一个演示,希望它能帮助你。

创建表,数据库是mysql。

CREATE TABLE test(id VARCHAR(32))

SBT:

"com.lightbend.akka"                        %% "akka-stream-alpakka-slick"     % "1.1.0",
"mysql"                                      % "mysql-connector-java"          % "5.1.40"

法典:

package tech.parasol.scala.crud
import java.sql.SQLException
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives.{complete, get, path, _}
import akka.stream.alpakka.slick.scaladsl.{Slick, SlickSession}
import akka.stream.scaladsl.Sink
import akka.stream.{ActorAttributes, ActorMaterializer, Supervision}
import com.typesafe.config.ConfigFactory
import scala.concurrent.Future
import scala.io.StdIn
import scala.util.{Failure, Success}
object CrudTest1 {

def main(args: Array[String]): Unit = {
implicit val system = ActorSystem("CrudTest1")
implicit val materializer = ActorMaterializer()
implicit val executionContext = system.dispatcher


val hostName = "120.0.0.1"
val rocketDbConfig =
s"""
|db-config {
|  profile = "slick.jdbc.MySQLProfile$$"
|  db {
|    dataSourceClass = "slick.jdbc.DriverDataSource"
|    properties = {
|      driver = "com.mysql.jdbc.Driver"
|      url = "jdbc:mysql://${hostName}:3306/rocket?useUnicode=true&characterEncoding=utf8&rewriteBatchedStatements=true&useSSL=false"
|      user = "root"
|      password = "passw0rd"
|    }
|  }
|}
|
""".stripMargin

implicit val session = SlickSession.forConfig("db-config", ConfigFactory.parseString(rocketDbConfig))

import session.profile.api._
def persistence(message: String) = {
def insert(message: String): DBIO[Int] = {
sqlu"""INSERT INTO test(id) VALUES (${message})"""
}
session.db.run(insert(message)).map {
case _ => message
}.recover {
case e : SQLException => {
throw new Exception("Database error ===>")}
case e : Exception => {
throw new Exception("Database error.")}
}
}

val route = path("hello" / Segment ) { name =>
get {
val res = persistence(name)
onComplete(res) {
case Success(value) => {
complete(s"<h1>Say hello to ${name}</h1>")
}
case Failure(e) => {
complete(s"<h1>Failed to say hello to ${name}</h1>")
}
}
}
}
val bindingFuture = Http().bindAndHandle(route, "localhost", 8088)
println(s"Server online at http://localhost:8088/nPress RETURN to stop...")
StdIn.readLine() // let it run until user presses return
bindingFuture
.flatMap(_.unbind()) // trigger unbinding from the port
.onComplete(_ => system.terminate()) // and shutdown when done
}
}

是的,基本上在 AkkaHttp 中接收的每个请求时,我们都会创建一个 AkkaStreams Graph(通常只是一个管道(,基本上只是数据库中的 Slick Alpakka Source,可能由一些运算符作为前缀,然后在 AkkaHttp 中返回,这当然支持 Source。更多详情请见 [https://www.quora.com/Is-it-possible-to-build-an-OLTP-CRUD-HTTP-server-using-Akka-HTTP-Akka-Streams-Alpakka-and-a-database-Do-you-know-any-examples-of-code-on-GitHub-or-elsewhere/answer/Nicolae-Marasoiu]

最新更新