我正在尝试使用akka streams来写一个简单的邮件消费者。
build.sbt
"com.typesafe.akka" %% "akka-stream-kafka" % "0.17"
我的代码
object AkkaStreamskafka extends App {
// producer settings
implicit val system = ActorSystem()
implicit val actorMaterializer = ActorMaterializer()
val consumerSettings = ConsumerSettings(system, Some(new ByteArrayDeserializer), Some(new StringDeserializer))
.withBootstrapServers("foo:9092")
.withGroupId("abhi")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
val source = Consumer
.committableSource(consumerSettings, Subscriptions.topics("my-topic))
val flow = Flow[ConsumerMessage.CommittableMessage[Array[Byte], String]].mapAsync(1){msg =>
msg.committableOffset.commitScaladsl().map(_ => msg.record.value);
}
val sink = Sink.foreach[String](println)
val graph = RunnableGraph.fromGraph(GraphDSL.create(sink){implicit builder =>
s =>
import GraphDSL.Implicits._
source ~> flow ~> s.in
ClosedShape
})
val future = graph.run()
Await.result(future, Duration.Inf)
}
但是我有一个错误
[WARN] [09/28/2017 13:12:52.333] [default-akka.kafka.default-dispatcher-7]
[akka://default/system/kafka-consumer-1] Consumer interrupted with WakeupException after timeout.
Message: null. Current value of akka.kafka.consumer.wakeup-timeout is 3000 milliseconds
编辑:
我可以执行ssh foo
,然后在服务器终端./kafka-console-consumer --zookeeper localhost:2181 --topic my-topic
上键入以下命令,然后我可以看到数据。因此,我猜我的服务器名称foo
是正确的,并且Kafka在该机器上启动并运行。
edit2:
在Kafka服务器上,我正在运行Cloudera 5.7.1。kafka版本是jars/kafka_2.10-0.9.0.9.0-kafka-2.0.0.0.jar
我能够自己解决问题。
库"com.typesafe.akka" %% "akka-stream-kafka"
仅适用于Kafka 0.10及以后。它不适用于较早版本的kafka。当我在kafka服务器上列出kafka罐子时,我发现我正在使用cloudera 5.7.1,随附kafka 0.9。
为了创建此版本的Akka流源。我需要使用
"com.softwaremill.reactivekafka" % "reactive-kafka-core_2.11" % "0.10.0"
他们也有一个示例https://github.com/kciesielski/reactive-kafka
此代码对我很完美
implicit val actorSystem = ActorSystem()
implicit val actorMaterializer = ActorMaterializer()
val kafka = new ReactiveKafka()
val consumerProperties = ConsumerProperties(
bootstrapServers = "foo:9092",
topic = "my-topic",
groupId = "abhi",
valueDeserializer = new StringDeserializer()
)
val source = Source.fromPublisher(kafka.consume(consumerProperties))
val flow = Flow[ConsumerRecord[Array[Byte], String]].map(r => r.value())
val sink = Sink.foreach[String](println)
val graph = RunnableGraph.fromGraph(GraphDSL.create(sink) {implicit builder =>
s =>
import GraphDSL.Implicits._
source ~> flow ~> s.in
ClosedShape
})
val future = graph.run()
future.onComplete{_ =>
actorSystem.terminate()
}
Await.result(actorSystem.whenTerminated, Duration.Inf)