Kafka Consumer for Spark 用 Scala for Kafka API 0.10 编写:自定义 A



我正在将我的Spark Scala App Kafka API升级到v. 0.10。我曾经创建自定义方法来反序列化字节字符串格式的消息。

我已经意识到有一种方法可以将字符串反序列化程序或字节数组解序列化器作为参数传递给键或值。

但是,我找不到有关如何创建自定义 Avro 模式反序列化程序的任何信息,因此当我创建 DirectStream 并使用来自 Kafka 的数据时,我的 kafkaStream 可以使用它。

可能吗?

这是可能的。您需要重写org.apache.kafka.common.serialization中定义的Deserializer<T>接口,并且需要通过保存 Kafka 参数的ConsumerStrategy[K, V]类将key.deserializervalue.deserializer指向自定义类。例如:

import org.apache.kafka.common.serialization.Deserializer
class AvroDeserializer extends Deserializer[Array[Byte]] {
override def configure(map: util.Map[String, _], b: Boolean): Unit = ???
override def close(): Unit = ???
override def deserialize(s: String, bytes: Array[Byte]): Array[Byte] = ???
}

然后:

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe
import my.location.with.AvroDeserializer
val ssc: StreamingContext = ???
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "localhost:9092,anotherhost:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[AvroDeserializer],
"group.id" -> "use_a_separate_group_id_for_each_stream",
"auto.offset.reset" -> "latest",
"enable.auto.commit" -> (false: java.lang.Boolean)
)
val topics = Array("sometopic")
val stream = KafkaUtils.createDirectStream[String, MyTypeWithAvroDeserializer](
ssc,
PreferConsistent,
Subscribe[String, String](topics, kafkaParams)
)

最新更新