Kafka消费者在尝试使用Spark处理消息时多次消费消息



我有一个Kafka使用者,它从主题中读取消息,并使用spark将其写入配置单元表。当我在Yarn上运行代码时,它会多次读取相同的消息。我在这个话题上有大约100000条信息。但是,我的消费者会多次阅读相同的内容。当我做一个distinct时,我得到了实际的计数。

这是我写的代码。我想知道我是否错过了任何设置。

val spark = SparkSession.builder()
.appName("Kafka Consumer")
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
val kafkaConsumerProperty = new Properties()
kafkaConsumerProperty.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "---")
kafkaConsumerProperty.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
kafkaConsumerProperty.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
kafkaConsumerProperty.put(ConsumerConfig.GROUP_ID_CONFIG, "draw_attributes")
kafkaConsumerProperty.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
kafkaConsumerProperty.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true")
val topic = "space_orchestrator"
val kafkaConsumer = new KafkaConsumer[String,String](kafkaConsumerProperty)
kafkaConsumer.subscribe(Collections.singletonList(topic))

while(true){
val recordSeq = kafkaConsumer.poll(10000).toSeq.map( x => x.value())
if(!recordSeq.isEmpty)
{
val newDf = spark.read.json(recordSeq.toDS)
newDf.write.mode(SaveMode.Overwrite).saveAsTable("dmart_dev.draw_attributes")
}
}

另一种选择是尝试手动设置偏移量。为此,应禁用自动提交(enable.auto.commit = false(。对于手动提交,KafkaConsumers提供了两种方法,即commitSync()commitAsync()。顾名思义,commitSync((是一个阻塞调用,它在偏移量成功提交后返回,而commitAsync((则立即返回。

最新更新