如何获取Kafka消费者(Alpakka)返回的Map[String,String]



我应该从Kafka消费者那里得到一个Map[String,String],但我真的不知道怎么做。我设法配置了消费者,它工作得很好,但我不明白如何才能获得Map。

implicit val system: ActorSystem = ActorSystem()
val consumerConfig = system.settings.config.getConfig("akka.kafka.consumer")
val = kafkaConsumerSettings =
ConsumerSettings(consumerConfig, new StringDeserializer, new StringDeserializer)
.withBootstrapServers(localhost:9094)
.withGroupId(group1)
Consumer
.plainSource(kafkaConsumerSettings, Subscriptions.topics(entity.entity_name))
.toMat(Sink.foreach(println))(DrainingControl.apply)
.run()

Lightbend的建议是在反序列化来自Kafka 的传入数据时处理字节数组

消息的反序列化的一般建议是使用字节数组(或字符串(作为值,并在Akka Stream中的映射操作中进行反序列化,而不是直接在Kafka反序列化器中实现。当在Akka Stream中显式处理反序列化时,可以更容易地实现所需的错误处理策略,如下例所示。

为此,您可以使用以下设置设置消费者:

val consumerSettings = ConsumerSettings(consumerConfig, new StringDeserializer, new ByteArrayDeserializer)

并通过从Record类中调用.value()方法来获得结果。要反序列化它,我建议使用circe+jawn。这个代码应该可以完成任务。

import io.circe.jawn
import io.circe.generic.auto._
val bytes = record.value()
val data = jawn.parseByteBuffer(ByteBuffer.wrap(bytes)).flatMap(_.as[Map[String, String]])

相关内容

最新更新