超时后,消费者被Wakeupexception中断.消息:null.akka.kafka.consumer.wakeup



我正在尝试使用akka streams来写一个简单的邮件消费者。

build.sbt

"com.typesafe.akka" %% "akka-stream-kafka" % "0.17"

我的代码

object AkkaStreamskafka extends App {
   // producer settings
   implicit val system = ActorSystem()
   implicit val actorMaterializer = ActorMaterializer()
   val consumerSettings = ConsumerSettings(system, Some(new ByteArrayDeserializer), Some(new StringDeserializer))
      .withBootstrapServers("foo:9092")
      .withGroupId("abhi")
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
   val source = Consumer
      .committableSource(consumerSettings, Subscriptions.topics("my-topic))
   val flow = Flow[ConsumerMessage.CommittableMessage[Array[Byte], String]].mapAsync(1){msg =>
         msg.committableOffset.commitScaladsl().map(_ => msg.record.value);
      }
   val sink = Sink.foreach[String](println)
   val graph = RunnableGraph.fromGraph(GraphDSL.create(sink){implicit builder =>
      s =>
         import GraphDSL.Implicits._
         source ~> flow ~> s.in
         ClosedShape
   })
   val future = graph.run()
   Await.result(future, Duration.Inf)
}

但是我有一个错误

[WARN] [09/28/2017 13:12:52.333] [default-akka.kafka.default-dispatcher-7] 
[akka://default/system/kafka-consumer-1] Consumer interrupted with WakeupException after timeout. 
Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 3000 milliseconds

编辑:

我可以执行ssh foo,然后在服务器终端./kafka-console-consumer --zookeeper localhost:2181 --topic my-topic上键入以下命令,然后我可以看到数据。因此,我猜我的服务器名称foo是正确的,并且Kafka在该机器上启动并运行。

edit2:

在Kafka服务器上,我正在运行Cloudera 5.7.1。kafka版本是jars/kafka_2.10-0.9.0.9.0-kafka-2.0.0.0.jar

我能够自己解决问题。

"com.typesafe.akka" %% "akka-stream-kafka"仅适用于Kafka 0.10及以后。它不适用于较早版本的kafka。当我在kafka服务器上列出kafka罐子时,我发现我正在使用cloudera 5.7.1,随附kafka 0.9。

为了创建此版本的Akka流源。我需要使用

"com.softwaremill.reactivekafka" % "reactive-kafka-core_2.11" % "0.10.0"

他们也有一个示例https://github.com/kciesielski/reactive-kafka

此代码对我很完美

implicit val actorSystem = ActorSystem()
implicit val actorMaterializer = ActorMaterializer()
val kafka = new ReactiveKafka()
val consumerProperties = ConsumerProperties(
  bootstrapServers = "foo:9092",
  topic = "my-topic",
  groupId = "abhi",
  valueDeserializer = new StringDeserializer()
)
val source = Source.fromPublisher(kafka.consume(consumerProperties))
val flow = Flow[ConsumerRecord[Array[Byte], String]].map(r => r.value())
val sink = Sink.foreach[String](println)
val graph = RunnableGraph.fromGraph(GraphDSL.create(sink) {implicit builder =>
 s =>
   import GraphDSL.Implicits._
    source ~> flow ~> s.in
   ClosedShape
})
val future = graph.run()
future.onComplete{_ =>
  actorSystem.terminate()
}
Await.result(actorSystem.whenTerminated, Duration.Inf)

最新更新