我有一个基本的scala akka http crud应用程序。有关类的相关类,请参见下文。
我只是想在创建/更新实体时将一个实体ID和一些数据(作为JSON)编写给Kafka主题。
我正在寻找http://doc.akka.io/docs/akka-stream-kafka/current/producer.html,但是对Scala和Akka来说是新手,不确定如何将其集成到我的应用程序中?
例如,从上面的文档中,这是生产者写给卡夫卡的示例,所以我认为我需要类似的东西,但是在我的应用程序中,这是否应该去?在创建用户后,我可以在服务中的创建方法中添加另一个地图调用?
非常感谢!
val done = Source(1 to 100)
.map(_.toString)
.map { elem =>
new ProducerRecord[Array[Byte], String]("topic1", elem)
}
.runWith(Producer.plainSink(producerSettings))
或我需要在我的server.scala?
中https://github.com/hseeberger/accessus做类似示例的事情。weberver.scala
object System {
implicit val system = ActorSystem()
implicit val dispatcher = system.dispatcher
implicit val actorMaterializer = ActorMaterializer()
}
object WebServer extends App {
import System._
val config = new ApplicationConfig() with ConfigLoader
ConfigurationFactory.setConfigurationFactory(new LoggingConfFileConfigurationFactory(config.loggingConfig))
val injector = Guice.createInjector(new MyAppModule(config))
val routes = injector.getInstance(classOf[Router]).routes
Http().bindAndHandle(routes, config.httpConfig.interface, config.httpConfig.port)
}
router.scala
def routes(): Route = {
post {
entity(as[User]) { user =>
val createUser = userService.create(user)
onSuccess(createUser) {
case Invalid(y: NonEmptyList[Err]) => {
throw new ValidationException(y)
}
case Valid(u: User) => {
complete(ToResponseMarshallable((StatusCodes.Created, u)))
}
}
}
} ~
// More routes here, left out for example
}
service.scala
def create(user: User): Future[MaybeValid[User]] = {
for {
validating <- userValidation.validateCreate(user)
result <- validating match {
case Valid(x: User) =>
userRepo.create(x)
.map(dbUser => Valid(UserConverters.fromUserRow(x)))
case y: DefInvalid =>
Future{y}
}
} yield result
}
repo.scala
def create(user: User): Future[User] = {
mutateDbProvider.db.run(
userTable returning userTable.map(_.userId)
into ((user, id) => user.copy(userId = id)) +=
user.copy(createdDate = Some(Timestamp.valueOf(LocalDateTime.now())))
)
}
由于您已经将Route
写给了Entity
的1 User
,所以我认为您不需要Producer.plainSink
。相反,我认为Producer.send
也会工作。另外,作为旁注,抛出异常不是"惯用性" scala。因此,我更改了无效用户的逻辑:
val producer : KafkaProducer = new KafkaProducer(producerSettings)
val routes : Route =
post {
entity(as[User]) { user =>
val createUser = userService.create(user)
onSuccess(createUser) {
case Invalid(y: NonEmptyList[Err]) =>
complete(BadRequest -> "invalid user")
case Valid(u: User) => {
val producerRecord =
new ProducerRecord[Array[Byte], String]("topic1",s"""{"userId" : ${u.userId}, "entity" : "User"}""")
onComplete(producer send producerRecord) { _ =>
complete(ToResponseMarshallable((StatusCodes.Created, u)))
}
}
}
}
}