找到java.util.concurrent.future所需的scala.concurrent.future



相关:scala.concurrent.future包装器Java.util.concurrent.future

这来自我的另一个问题:

如何将akka流kafka(反应性-kafka)集成到akka http应用程序中?

我有一个AKKA HTTP应用程序,我想在我的路线中的oncomplete函数中向Kafka发送消息/ProducterRecord,如以下内容:

val producer : KafkaProducer = new KafkaProducer(producerSettings)
val routes : Route = 
  post {
    entity(as[User]) { user =>
      val createUser = userService.create(user)
      onSuccess(createUser) {
        case Invalid(y: NonEmptyList[Err]) =>  
          complete(BadRequest -> "invalid user")
        case Valid(u: User) => { 
          val producerRecord = 
            new ProducerRecord[Array[Byte], String]("topic1","some message")
          onComplete(producer.send(producerRecord)) { _ =>
            complete(ToResponseMarshallable((StatusCodes.Created, u)))
          }
        }
      }
    }
  }

但是, oncomplete(生产者发送producerRecord)正在生成以下类型不匹配错误:

[错误]发现:未来[org.apache.kafka.clients.producer.recordmetadata](在java.util.concurrent)[错误]必需:未来[org.apache.kafka.clients.producer.recordmetadata](在scala.concurrent中)[错误] oncompleterecordmetadata {_ =>

是否有任何方式,也许是将生产者用作水槽(http://doc.akka.io/docs/akka-stream-kafka/current/producer.html#producer-as-a-a-sink)代替Java producer.send 函数?

您可以利用Cake的Kafka客户端,这将完成运行Java期货的工作,并为您提供Scala Futures。一旦确保创建cakesolutions.kafka.KafkaProducer而不是org.apache.kafka.clients.producer.KafkaProducer,其余的代码几乎应该保持不变。

另外,您可以将其分类为利用反应性kafka,同时继续使用高级AKKA HTTP DSL。您可以通过将生产者记录运行到Kafka接收器来做到这一点,这样:

val producerSink = Producer.plainSink(producerSettings)
...
        // inside the route
        val producerRecord =
          new ProducerRecord[Array[Byte], String]("topic1", "some message")
        onComplete(Source.single(producerRecord).runWith(producerSink)) { _ =>
          complete(ToResponseMarshallable((StatusCodes.Created, u)))
        }

要回答您的特定问题,scala-java8-compat库提供了Java8和Scala期货之间的转换器。

特别是,您可以使用FutureConverters.toScala(producer.send(producerRecord))java.util.concurrent.Future转换为scala.concurrent.Future

但是,使用具有Scala友好型API本身的客户库库(如上面的Stefano所建议)可能会带来最佳结果。

相关内容

  • 没有找到相关文章

最新更新