相关:scala.concurrent.future包装器Java.util.concurrent.future
这来自我的另一个问题:
如何将akka流kafka(反应性-kafka)集成到akka http应用程序中?
我有一个AKKA HTTP应用程序,我想在我的路线中的oncomplete函数中向Kafka发送消息/ProducterRecord,如以下内容:
val producer : KafkaProducer = new KafkaProducer(producerSettings)
val routes : Route =
post {
entity(as[User]) { user =>
val createUser = userService.create(user)
onSuccess(createUser) {
case Invalid(y: NonEmptyList[Err]) =>
complete(BadRequest -> "invalid user")
case Valid(u: User) => {
val producerRecord =
new ProducerRecord[Array[Byte], String]("topic1","some message")
onComplete(producer.send(producerRecord)) { _ =>
complete(ToResponseMarshallable((StatusCodes.Created, u)))
}
}
}
}
}
但是, oncomplete(生产者发送producerRecord)正在生成以下类型不匹配错误:
[错误]发现:未来[org.apache.kafka.clients.producer.recordmetadata](在java.util.concurrent)[错误]必需:未来[org.apache.kafka.clients.producer.recordmetadata](在scala.concurrent中)[错误] oncompleterecordmetadata {_ =>
是否有任何方式,也许是将生产者用作水槽(http://doc.akka.io/docs/akka-stream-kafka/current/producer.html#producer-as-a-a-sink)代替Java producer.send 函数?
您可以利用Cake的Kafka客户端,这将完成运行Java期货的工作,并为您提供Scala Futures。一旦确保创建cakesolutions.kafka.KafkaProducer
而不是org.apache.kafka.clients.producer.KafkaProducer
,其余的代码几乎应该保持不变。
另外,您可以将其分类为利用反应性kafka,同时继续使用高级AKKA HTTP DSL。您可以通过将生产者记录运行到Kafka接收器来做到这一点,这样:
val producerSink = Producer.plainSink(producerSettings)
...
// inside the route
val producerRecord =
new ProducerRecord[Array[Byte], String]("topic1", "some message")
onComplete(Source.single(producerRecord).runWith(producerSink)) { _ =>
complete(ToResponseMarshallable((StatusCodes.Created, u)))
}
要回答您的特定问题,scala-java8-compat库提供了Java8和Scala期货之间的转换器。
特别是,您可以使用FutureConverters.toScala(producer.send(producerRecord))
将java.util.concurrent.Future
转换为scala.concurrent.Future
但是,使用具有Scala友好型API本身的客户库库(如上面的Stefano所建议)可能会带来最佳结果。