为什么Kafka Direct Stream为每个消息创建一个新解码器



我有一个用java编写的火花流应用程序,并使用Spark 2.1。我正在使用KafkaUtils.createDirectStream阅读Kafka的消息。我正在使用Kryo编码器/解码器作为Kafka消息。我在kafka properties-> key.deserializer,value.deserializer,key.serializer,value.deserializer

中指定了这一点。当Spark在微批处理中拉消息时,使用Kryo解码器成功解码了消息。但是,我注意到Spark Executor创建了Kryo解码器的新实例,用于解码KAFKA读取的每条消息。我通过将日志放入解码器构造函数

中检查了一下这对我来说似乎很奇怪。每个消息和每个批次都不应使用相同的解码器实例吗?

代码我正在从kafka阅读的地方:

JavaInputDStream<ConsumerRecord<String, Class1>> consumerRecords = KafkaUtils.createDirectStream(
        jssc,
        LocationStrategies.PreferConsistent(),
        ConsumerStrategies.<String, Class1>Subscribe(topics, kafkaParams));
JavaPairDStream<String, Class1> converted = consumerRecords.mapToPair(consRecord -> {
    return new Tuple2<String, Class1>(consRecord.key(), consRecord.value());
});

如果我们想查看Spark如何从内部从Kafka获取数据,我们需要查看KafkaRDD.compute,这是为每个RDD实现的方法计算RDD

override def compute(thePart: Partition, context: TaskContext): Iterator[R] = {
  val part = thePart.asInstanceOf[KafkaRDDPartition]
  assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))
  if (part.fromOffset == part.untilOffset) {
    logInfo(s"Beginning offset ${part.fromOffset} is the same as ending offset " +
    s"skipping ${part.topic} ${part.partition}")
    Iterator.empty
  } else {
    new KafkaRDDIterator(part, context)
  }
}

这里重要的是else子句,它创建了KafkaRDDIterator。这内有:

val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
  .newInstance(kc.config.props)
  .asInstanceOf[Decoder[K]]
val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
  .newInstance(kc.config.props)
  .asInstanceOf[Decoder[V]]

,如您所见,它可以通过反射创建一个密钥解码器和值解码器的实例,为每个基础分区。这意味着它不是被生成 per sagess ,但 per kafka partition

为什么这样实现?我不知道。我之所以假设是因为与Spark中发生的所有其他分配相比,钥匙和价值解码器应具有可忽略的性能。

如果您已经介绍了您的应用程序并发现这是一个分配热路径,则可以打开问题。否则,我不会担心。

相关内容

最新更新